/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamContinuationTokenHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ReadChangeStreamPartitionAction {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionAction.class);
    private final @UnknownKeyFor @NonNull @Initialized MetadataTableDao metadataTableDao;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamDao changeStreamDao;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics;
    private final @UnknownKeyFor @NonNull @Initialized ChangeStreamAction changeStreamAction;
    private final @UnknownKeyFor @NonNull @Initialized Duration heartbeatDuration;

    public ReadChangeStreamPartitionAction(@UnknownKeyFor @NonNull @Initialized MetadataTableDao metadataTableDao, @UnknownKeyFor @NonNull @Initialized ChangeStreamDao changeStreamDao, @UnknownKeyFor @NonNull @Initialized ChangeStreamMetrics metrics, @UnknownKeyFor @NonNull @Initialized ChangeStreamAction changeStreamAction, @UnknownKeyFor @NonNull @Initialized Duration heartbeatDuration) {
        this.metadataTableDao = metadataTableDao;
        this.changeStreamDao = changeStreamDao;
        this.metrics = metrics;
        this.changeStreamAction = changeStreamAction;
        this.heartbeatDuration = heartbeatDuration;
    }

    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.ProcessContinuation run(@UnknownKeyFor @NonNull @Initialized PartitionRecord partitionRecord, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized StreamProgress, @UnknownKeyFor @NonNull @Initialized StreamProgress> tracker, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor @NonNull @Initialized ChangeStreamMutation>> receiver, @UnknownKeyFor @NonNull @Initialized ManualWatermarkEstimator<@UnknownKeyFor @NonNull @Initialized Instant> watermarkEstimator) throws @UnknownKeyFor @NonNull @Initialized IOException {
        boolean shouldDebug = ((Instant)watermarkEstimator.getState()).plus((ReadableDuration)Duration.standardMinutes((long)5L)).isBeforeNow();
        if (shouldDebug) {
            LOG.info("RCSP: Partition: " + partitionRecord + "\n Watermark: " + watermarkEstimator.getState() + "\n RestrictionTracker: " + tracker.currentRestriction());
        }
        if (!this.metadataTableDao.lockPartition(partitionRecord.getPartition(), partitionRecord.getUuid())) {
            LOG.info("RCSP: Could not acquire lock for partition: {}, with uid: {}, because this is a duplicate and another worker is working on this partition already.", (Object)ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), (Object)partitionRecord.getUuid());
            StreamProgress streamProgress = new StreamProgress();
            streamProgress.setFailToLock(true);
            this.metrics.decPartitionStreamCount();
            if (!tracker.tryClaim((Object)streamProgress)) {
                LOG.debug("RCSP: Failed to claim tracker after failing to lock partition.");
                return DoFn.ProcessContinuation.stop();
            }
            return DoFn.ProcessContinuation.stop();
        }
        CloseStream closeStream = ((StreamProgress)tracker.currentRestriction()).getCloseStream();
        if (closeStream != null) {
            if (closeStream.getStatus().getCode() != Status.Code.OUT_OF_RANGE) {
                LOG.error("RCSP {}: Reached unexpected terminal state: {}", (Object)ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), (Object)closeStream.getStatus());
                this.metrics.decPartitionStreamCount();
                return DoFn.ProcessContinuation.stop();
            }
            ArrayList<Range.ByteStringRange> childPartitions = new ArrayList<Range.ByteStringRange>();
            boolean useNewPartitionsField = closeStream.getNewPartitions().size() == closeStream.getChangeStreamContinuationTokens().size();
            for (int i = 0; i < closeStream.getChangeStreamContinuationTokens().size(); ++i) {
                Range.ByteStringRange childPartition = useNewPartitionsField ? (Range.ByteStringRange)closeStream.getNewPartitions().get(i) : ((ChangeStreamContinuationToken)closeStream.getChangeStreamContinuationTokens().get(i)).getPartition();
                childPartitions.add(childPartition);
                ChangeStreamContinuationToken token = ChangeStreamContinuationTokenHelper.getTokenWithCorrectPartition(partitionRecord.getPartition(), (ChangeStreamContinuationToken)closeStream.getChangeStreamContinuationTokens().get(i));
                this.metadataTableDao.writeNewPartition(childPartition, token, partitionRecord.getPartition(), (Instant)watermarkEstimator.getState());
            }
            if (shouldDebug) {
                LOG.info("RCSP {}: Split/Merge into {}", (Object)ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), (Object)ByteStringRangeHelper.partitionsToString(childPartitions));
            }
            if (!ByteStringRangeHelper.isSuperset(childPartitions, partitionRecord.getPartition())) {
                LOG.warn("RCSP {}: CloseStream has child partition(s) {} that doesn't cover the keyspace", (Object)ByteStringRangeHelper.formatByteStringRange(partitionRecord.getPartition()), (Object)ByteStringRangeHelper.partitionsToString(childPartitions));
            }
            this.metadataTableDao.deleteStreamPartitionRow(partitionRecord.getPartition());
            this.metrics.decPartitionStreamCount();
            return DoFn.ProcessContinuation.stop();
        }
        this.metadataTableDao.updateWatermark(partitionRecord.getPartition(), (Instant)watermarkEstimator.getState(), ((StreamProgress)tracker.currentRestriction()).getCurrentToken());
        ServerStream<ChangeStreamRecord> stream = null;
        try {
            stream = this.changeStreamDao.readChangeStreamPartition(partitionRecord, (StreamProgress)tracker.currentRestriction(), this.heartbeatDuration, shouldDebug);
            for (ChangeStreamRecord record : stream) {
                Optional<DoFn.ProcessContinuation> result = this.changeStreamAction.run(partitionRecord, record, tracker, receiver, watermarkEstimator, shouldDebug);
                if (!result.isPresent()) continue;
                DoFn.ProcessContinuation processContinuation = result.get();
                return processContinuation;
            }
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            if (stream != null) {
                stream.cancel();
            }
        }
        return DoFn.ProcessContinuation.resume();
    }
}

