/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChildPartitionsRecordAction {
    private static final Logger LOG = LoggerFactory.getLogger(ChildPartitionsRecordAction.class);
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamMetrics metrics;

    ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) {
        this.partitionMetadataDao = partitionMetadataDao;
        this.metrics = metrics;
    }

    @VisibleForTesting
    public Optional<DoFn.ProcessContinuation> run(PartitionMetadata partition, ChildPartitionsRecord record, RestrictionTracker<TimestampRange, Timestamp> tracker, ManualWatermarkEstimator<Instant> watermarkEstimator) {
        String token = partition.getPartitionToken();
        LOG.debug("[" + token + "] Processing child partition record " + record);
        Timestamp startTimestamp = record.getStartTimestamp();
        Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
        if (!tracker.tryClaim((Object)startTimestamp)) {
            LOG.debug("[" + token + "] Could not claim queryChangeStream(" + startTimestamp + "), stopping");
            return Optional.of(DoFn.ProcessContinuation.stop());
        }
        watermarkEstimator.setWatermark(startInstant);
        for (ChildPartition childPartition : record.getChildPartitions()) {
            this.processChildPartition(partition, record, childPartition);
        }
        LOG.debug("[" + token + "] Child partitions action completed successfully");
        return Optional.empty();
    }

    private void processChildPartition(PartitionMetadata partition, ChildPartitionsRecord record, ChildPartition childPartition) {
        String partitionToken = partition.getPartitionToken();
        String childPartitionToken = childPartition.getToken();
        boolean isSplit = this.isSplit(childPartition);
        LOG.debug("[" + partitionToken + "] Processing child partition" + (isSplit ? " split" : " merge") + " event");
        PartitionMetadata row = this.toPartitionMetadata(record.getStartTimestamp(), partition.getEndTimestamp(), partition.getHeartbeatMillis(), childPartition);
        LOG.debug("[" + partitionToken + "] Inserting child partition token " + childPartitionToken);
        Boolean insertedRow = this.partitionMetadataDao.runInTransaction(transaction -> {
            if (transaction.getPartition(childPartitionToken) == null) {
                transaction.insert(row);
                return true;
            }
            return false;
        }).getResult();
        if (insertedRow.booleanValue() && isSplit) {
            this.metrics.incPartitionRecordSplitCount();
        } else if (insertedRow.booleanValue()) {
            this.metrics.incPartitionRecordMergeCount();
        } else {
            LOG.debug("[" + partitionToken + "] Child token " + childPartitionToken + " already exists, skipping...");
        }
    }

    private boolean isSplit(ChildPartition childPartition) {
        return childPartition.getParentTokens().size() == 1;
    }

    private PartitionMetadata toPartitionMetadata(Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis, ChildPartition childPartition) {
        return PartitionMetadata.newBuilder().setPartitionToken(childPartition.getToken()).setParentTokens(childPartition.getParentTokens()).setStartTimestamp(startTimestamp).setEndTimestamp(endTimestamp).setHeartbeatMillis(heartbeatMillis).setState(PartitionMetadata.State.CREATED).setWatermark(startTimestamp).build();
    }
}

