/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DetectNewPartitionsAction {
    private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
    private final PartitionMetadataDao dao;
    private final PartitionMetadataMapper mapper;
    private final ChangeStreamMetrics metrics;
    private final Duration resumeDuration;

    public DetectNewPartitionsAction(PartitionMetadataDao dao, PartitionMetadataMapper mapper, ChangeStreamMetrics metrics, Duration resumeDuration) {
        this.dao = dao;
        this.mapper = mapper;
        this.metrics = metrics;
        this.resumeDuration = resumeDuration;
    }

    public DoFn.ProcessContinuation run(RestrictionTracker<TimestampRange, Timestamp> tracker, DoFn.OutputReceiver<PartitionMetadata> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator) {
        Timestamp readTimestamp = ((TimestampRange)tracker.currentRestriction()).getFrom();
        Timestamp minWatermark = this.dao.getUnfinishedMinWatermark();
        if (minWatermark != null) {
            return this.processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp);
        }
        return this.terminate(tracker);
    }

    private DoFn.ProcessContinuation processPartitions(RestrictionTracker<TimestampRange, Timestamp> tracker, DoFn.OutputReceiver<PartitionMetadata> receiver, ManualWatermarkEstimator<Instant> watermarkEstimator, Timestamp minWatermark, Timestamp readTimestamp) {
        watermarkEstimator.setWatermark(new Instant((Object)minWatermark.toSqlTimestamp()));
        List<PartitionMetadata> partitions = this.getAllPartitionsCreatedAfter(readTimestamp);
        TreeMap<Timestamp, List<PartitionMetadata>> batches = this.batchByCreatedAt(partitions);
        return this.schedulePartitions(tracker, receiver, minWatermark, batches);
    }

    private List<PartitionMetadata> getAllPartitionsCreatedAfter(Timestamp readTimestamp) {
        ArrayList<PartitionMetadata> partitions = new ArrayList<PartitionMetadata>();
        try (ResultSet resultSet = this.dao.getAllPartitionsCreatedAfter(readTimestamp);){
            while (resultSet.next()) {
                PartitionMetadata partition = this.mapper.from(resultSet.getCurrentRowAsStruct());
                partitions.add(partition);
            }
        }
        LOG.info("Found " + partitions.size() + " to be scheduled (readTimestamp = " + readTimestamp + ")");
        return partitions;
    }

    private TreeMap<Timestamp, List<PartitionMetadata>> batchByCreatedAt(List<PartitionMetadata> partitions) {
        return partitions.stream().collect(Collectors.groupingBy(PartitionMetadata::getCreatedAt, TreeMap::new, Collectors.toList()));
    }

    private DoFn.ProcessContinuation schedulePartitions(RestrictionTracker<TimestampRange, Timestamp> tracker, DoFn.OutputReceiver<PartitionMetadata> receiver, Timestamp minWatermark, TreeMap<Timestamp, List<PartitionMetadata>> batches) {
        for (Map.Entry<Timestamp, List<PartitionMetadata>> batch : batches.entrySet()) {
            Timestamp batchCreatedAt = batch.getKey();
            List<PartitionMetadata> batchPartitions = batch.getValue();
            Timestamp scheduledAt = this.updateBatchToScheduled(batchPartitions);
            if (!tracker.tryClaim((Object)batchCreatedAt)) {
                return DoFn.ProcessContinuation.stop();
            }
            this.outputBatch(receiver, minWatermark, batchPartitions, scheduledAt);
        }
        return DoFn.ProcessContinuation.resume().withResumeDelay(this.resumeDuration);
    }

    private Timestamp updateBatchToScheduled(List<PartitionMetadata> batchPartitions) {
        List<String> batchPartitionTokens = batchPartitions.stream().map(PartitionMetadata::getPartitionToken).collect(Collectors.toList());
        return this.dao.updateToScheduled(batchPartitionTokens);
    }

    private void outputBatch(DoFn.OutputReceiver<PartitionMetadata> receiver, Timestamp minWatermark, List<PartitionMetadata> batchPartitions, Timestamp scheduledAt) {
        for (PartitionMetadata partition : batchPartitions) {
            Timestamp createdAt = partition.getCreatedAt();
            PartitionMetadata updatedPartition = partition.toBuilder().setScheduledAt(scheduledAt).build();
            LOG.info("[" + updatedPartition.getPartitionToken() + "] Scheduled partition at " + updatedPartition.getScheduledAt() + " with start time " + updatedPartition.getStartTimestamp() + " and end time " + updatedPartition.getEndTimestamp());
            receiver.outputWithTimestamp((Object)partition, new Instant((Object)minWatermark.toSqlTimestamp()));
            this.metrics.incPartitionRecordCount();
            this.metrics.updatePartitionCreatedToScheduled(new Duration(createdAt.toSqlTimestamp().getTime(), scheduledAt.toSqlTimestamp().getTime()));
        }
    }

    private DoFn.ProcessContinuation terminate(RestrictionTracker<TimestampRange, Timestamp> tracker) {
        tracker.tryClaim((Object)((TimestampRange)tracker.currentRestriction()).getTo());
        LOG.info("All partitions have been processed, stopping");
        return DoFn.ProcessContinuation.stop();
    }
}

