/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.meters;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.metrics.traits.StreamingMetricsMXBean;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.spi.schema.DataCollectionId;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Struct;

@ThreadSafe
public class StreamingMeter
implements StreamingMetricsMXBean {
    private final AtomicReference<Duration> lagBehindSource = new AtomicReference();
    private final AtomicLong numberOfCommittedTransactions = new AtomicLong();
    private final AtomicReference<Map<String, String>> sourceEventPosition = new AtomicReference(Collections.emptyMap());
    private final AtomicReference<String> lastTransactionId = new AtomicReference();
    private final CdcSourceTaskContext taskContext;
    private final EventMetadataProvider metadataProvider;

    public StreamingMeter(CdcSourceTaskContext taskContext, EventMetadataProvider metadataProvider) {
        this.taskContext = taskContext;
        this.metadataProvider = metadataProvider;
    }

    @Override
    public String[] getCapturedTables() {
        return this.taskContext.capturedDataCollections();
    }

    @Override
    public Map<String, String> getSourceEventPosition() {
        return this.sourceEventPosition.get();
    }

    @Override
    public long getMilliSecondsBehindSource() {
        Duration lag = this.lagBehindSource.get();
        return lag != null ? lag.toMillis() : -1L;
    }

    @Override
    public long getNumberOfCommittedTransactions() {
        return this.numberOfCommittedTransactions.get();
    }

    @Override
    public String getLastTransactionId() {
        return this.lastTransactionId.get();
    }

    public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
        Map<String, String> eventSource;
        String transactionId;
        Instant eventTimestamp = this.metadataProvider.getEventTimestamp(source, offset, key, value);
        if (eventTimestamp != null) {
            this.lagBehindSource.set(Duration.between(eventTimestamp, Instant.now()));
        }
        if ((transactionId = this.metadataProvider.getTransactionId(source, offset, key, value)) != null && !transactionId.equals(this.lastTransactionId.get())) {
            this.lastTransactionId.set(transactionId);
            this.numberOfCommittedTransactions.incrementAndGet();
        }
        if ((eventSource = this.metadataProvider.getEventSourcePosition(source, offset, key, value)) != null) {
            this.sourceEventPosition.set(eventSource);
        }
    }

    public void reset() {
        this.lagBehindSource.set(null);
        this.numberOfCommittedTransactions.set(0L);
        this.sourceEventPosition.set(Collections.emptyMap());
        this.lastTransactionId.set(null);
    }
}

