/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.data.v2.models.Range;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.GenerateInitialPartitionsAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ProcessNewPartitionsAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ResumeFromPreviousPipelineAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.DetectNewPartitionsState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.InitialPipelineState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.OrphanedMetadataCleaner;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DetectNewPartitionsAction {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
    private static final @UnknownKeyFor @NonNull @Initialized Duration DEBUG_WATERMARK_DELAY = Duration.standardMinutes((long)5L);
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics;
    private final @UnknownKeyFor @NonNull @Initialized MetadataTableDao metadataTableDao;
    @Nullable
    private final @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Instant endTime;
    private final @UnknownKeyFor @NonNull @Initialized ProcessNewPartitionsAction processNewPartitionsAction;
    private final @UnknownKeyFor @NonNull @Initialized GenerateInitialPartitionsAction generateInitialPartitionsAction;
    private final @UnknownKeyFor @NonNull @Initialized ResumeFromPreviousPipelineAction resumeFromPreviousPipelineAction;
    private transient @UnknownKeyFor @NonNull @Initialized PartitionReconciler partitionReconciler;
    private transient @UnknownKeyFor @NonNull @Initialized OrphanedMetadataCleaner orphanedMetadataCleaner;

    public DetectNewPartitionsAction(@UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics, @UnknownKeyFor @NonNull @Initialized MetadataTableDao metadataTableDao, @Nullable @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Instant endTime, @UnknownKeyFor @NonNull @Initialized GenerateInitialPartitionsAction generateInitialPartitionsAction, @UnknownKeyFor @NonNull @Initialized ResumeFromPreviousPipelineAction resumeFromPreviousPipelineAction, @UnknownKeyFor @NonNull @Initialized ProcessNewPartitionsAction processNewPartitionsAction) {
        this.metrics = metrics;
        this.metadataTableDao = metadataTableDao;
        this.endTime = endTime;
        this.generateInitialPartitionsAction = generateInitialPartitionsAction;
        this.resumeFromPreviousPipelineAction = resumeFromPreviousPipelineAction;
        this.processNewPartitionsAction = processNewPartitionsAction;
    }

    private @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Instant> getNewWatermark(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized StreamPartitionWithWatermark> streamPartitionsWithWatermark, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized NewPartition> newPartitions) {
        List<Range.ByteStringRange> overlappingStreamPartitions;
        ArrayList<StreamPartitionWithWatermark> slowPartitions = new ArrayList<StreamPartitionWithWatermark>();
        Instant lowWatermark = Instant.ofEpochMilli((long)Long.MAX_VALUE);
        ArrayList<Range.ByteStringRange> partitions = new ArrayList<Range.ByteStringRange>();
        for (StreamPartitionWithWatermark streamPartitionWithWatermark : streamPartitionsWithWatermark) {
            if (streamPartitionWithWatermark.getWatermark().plus((ReadableDuration)DEBUG_WATERMARK_DELAY).isBeforeNow()) {
                slowPartitions.add(streamPartitionWithWatermark);
            }
            if (streamPartitionWithWatermark.getWatermark().compareTo((ReadableInstant)lowWatermark) < 0) {
                lowWatermark = streamPartitionWithWatermark.getWatermark();
            }
            partitions.add(streamPartitionWithWatermark.getPartition());
        }
        if (!slowPartitions.isEmpty()) {
            LOG.warn("DNP: Updating watermark is held back by {} partitions : {}", (Object)slowPartitions.size(), (Object)slowPartitions.stream().map(e -> ByteStringRangeHelper.formatByteStringRange(e.getPartition()) + " => " + e.getWatermark()).collect(Collectors.joining(", ", "{", "}")));
        }
        if (!(overlappingStreamPartitions = ByteStringRangeHelper.getOverlappingPartitions(partitions)).isEmpty()) {
            LOG.warn("DNP: Updating watermark failed due to overlapping: {}", (Object)ByteStringRangeHelper.partitionsToString(overlappingStreamPartitions));
            return Optional.empty();
        }
        for (NewPartition newPartition : newPartitions) {
            partitions.addAll(newPartition.getParentPartitions());
            if (newPartition.getLowWatermark().compareTo((ReadableInstant)lowWatermark) >= 0) continue;
            lowWatermark = newPartition.getLowWatermark();
        }
        List<Range.ByteStringRange> list = ByteStringRangeHelper.getMissingPartitionsFromEntireKeySpace(partitions);
        if (list.isEmpty()) {
            LOG.info("DNP: Updating watermark: " + lowWatermark);
            return Optional.of(lowWatermark);
        }
        LOG.warn("DNP: Updating watermark failed due to missing {} partitions : {}.", (Object)list.size(), (Object)ByteStringRangeHelper.partitionsToString(list));
        return Optional.empty();
    }

    private void processReconcilerPartitions(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized PartitionRecord> receiver, @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, @UnknownKeyFor @NonNull @Initialized Instant startTime) {
        for (PartitionRecord reconciledPartition : this.partitionReconciler.getPartitionsToReconcile(watermarkEstimator.currentWatermark(), startTime)) {
            reconciledPartition.setUuid(UniqueIdGenerator.getNextId());
            reconciledPartition.setEndTime(this.endTime);
            for (NewPartition parentPartition : reconciledPartition.getParentPartitions()) {
                this.metadataTableDao.markNewPartitionForDeletion(parentPartition);
            }
            receiver.outputWithTimestamp((Object)reconciledPartition, Instant.EPOCH);
            LOG.warn("DNP: Reconciling missing partition: {}", (Object)reconciledPartition);
        }
    }

    private void cleanUpOrphanedMetadata() {
        for (NewPartition newPartitionToClean : this.orphanedMetadataCleaner.getOrphanedNewPartitions()) {
            this.metrics.incOrphanedNewPartitionCleanedCount();
            this.metadataTableDao.markNewPartitionForDeletion(newPartitionToClean);
            this.metadataTableDao.deleteNewPartition(newPartitionToClean);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean shouldUpdateWatermark(@UnknownKeyFor @NonNull @Initialized long count, @Nullable @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized DetectNewPartitionsState detectNewPartitionsState) {
        return count % 2L == 0L && (detectNewPartitionsState == null || detectNewPartitionsState.getWatermarkLastUpdated().plus((ReadableDuration)Duration.standardSeconds((long)10L)).isBeforeNow());
    }

    @VisibleForTesting
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation run(@UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized Long> tracker, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized PartitionRecord> receiver, @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator, @UnknownKeyFor @NonNull @Initialized InitialPipelineState initialPipelineState) throws @UnknownKeyFor @NonNull @Initialized Exception {
        LOG.debug("DNP: Watermark: " + watermarkEstimator.getState());
        LOG.debug("DNP: CurrentTracker: " + ((OffsetRange)tracker.currentRestriction()).getFrom());
        if (((OffsetRange)tracker.currentRestriction()).getFrom() == 0L) {
            if (!tracker.tryClaim((Object)0L)) {
                LOG.error("Could not claim initial DetectNewPartition restriction. No partitions are outputted.");
                return DoFn.ProcessContinuation.stop();
            }
            watermarkEstimator.setWatermark(initialPipelineState.getStartTime());
            if (initialPipelineState.isResume()) {
                this.resumeFromPreviousPipelineAction.run(receiver);
            } else {
                this.generateInitialPartitionsAction.run(receiver, initialPipelineState.getStartTime());
            }
            return DoFn.ProcessContinuation.resume();
        }
        this.partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        this.orphanedMetadataCleaner = new OrphanedMetadataCleaner();
        DetectNewPartitionsState detectNewPartitionsState = this.metadataTableDao.readDetectNewPartitionsState();
        if (detectNewPartitionsState != null) {
            watermarkEstimator.setWatermark(detectNewPartitionsState.getWatermark());
        }
        if (this.endTime != null && !watermarkEstimator.currentWatermark().isBefore((ReadableInstant)this.endTime)) {
            tracker.tryClaim((Object)((OffsetRange)tracker.currentRestriction()).getTo());
            return DoFn.ProcessContinuation.stop();
        }
        if (!tracker.tryClaim((Object)((OffsetRange)tracker.currentRestriction()).getFrom())) {
            LOG.warn("DNP: Checkpointing, stopping this run: " + tracker.currentRestriction());
            return DoFn.ProcessContinuation.stop();
        }
        List<StreamPartitionWithWatermark> streamPartitionsWithWatermark = null;
        if (this.shouldUpdateWatermark(((OffsetRange)tracker.currentRestriction()).getFrom(), detectNewPartitionsState)) {
            streamPartitionsWithWatermark = this.metadataTableDao.readStreamPartitionsWithWatermark();
        }
        List<NewPartition> newPartitions = this.metadataTableDao.readNewPartitions();
        ArrayList<Range.ByteStringRange> outputtedNewPartitions = new ArrayList<Range.ByteStringRange>();
        for (NewPartition newPartition : newPartitions) {
            if (this.processNewPartitionsAction.processNewPartition(newPartition, receiver)) {
                outputtedNewPartitions.add(newPartition.getPartition());
                continue;
            }
            this.partitionReconciler.addIncompleteNewPartitions(newPartition);
            this.orphanedMetadataCleaner.addIncompleteNewPartitions(newPartition);
        }
        if (streamPartitionsWithWatermark != null) {
            Optional<Instant> maybeWatermark = this.getNewWatermark(streamPartitionsWithWatermark, newPartitions);
            maybeWatermark.ifPresent(this.metadataTableDao::updateDetectNewPartitionWatermark);
            List<Range.ByteStringRange> existingPartitions = streamPartitionsWithWatermark.stream().map(StreamPartitionWithWatermark::getPartition).collect(Collectors.toList());
            existingPartitions.addAll(outputtedNewPartitions);
            List<Range.ByteStringRange> missingStreamPartitions = ByteStringRangeHelper.getMissingPartitionsFromEntireKeySpace(existingPartitions);
            this.orphanedMetadataCleaner.addMissingPartitions(missingStreamPartitions);
            this.partitionReconciler.addMissingPartitions(missingStreamPartitions);
        }
        if (((OffsetRange)tracker.currentRestriction()).getFrom() > 50L) {
            this.processReconcilerPartitions(receiver, watermarkEstimator, initialPipelineState.getStartTime());
            this.cleanUpOrphanedMetadata();
        }
        return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis((long)100L));
    }
}

