/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.cassandra.CassandraTopicSelector;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetFlushPolicy;
import io.debezium.connector.cassandra.OffsetWriter;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordEmitter
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
    private final KafkaProducer<byte[], byte[]> producer;
    private final TopicSelector<KeyspaceTable> topicSelector;
    private final OffsetWriter offsetWriter;
    private final OffsetFlushPolicy offsetFlushPolicy;
    private final Map<Record, Future<RecordMetadata>> futures = new LinkedHashMap<Record, Future<RecordMetadata>>();
    private final Object lock = new Object();
    private long timeOfLastFlush;
    private long emitCount = 0L;
    private Converter keyConverter;
    private Converter valueConverter;

    public KafkaRecordEmitter(String kafkaTopicPrefix, String heartbeatPrefix, Properties kafkaProperties, OffsetWriter offsetWriter, Duration offsetFlushIntervalMs, long maxOffsetFlushSize, Converter keyConverter, Converter valueConverter) {
        this.producer = new KafkaProducer(kafkaProperties);
        this.topicSelector = CassandraTopicSelector.defaultSelector(kafkaTopicPrefix, heartbeatPrefix);
        this.offsetWriter = offsetWriter;
        this.offsetFlushPolicy = offsetFlushIntervalMs.isZero() ? OffsetFlushPolicy.always() : OffsetFlushPolicy.periodic(offsetFlushIntervalMs, maxOffsetFlushSize);
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void emit(Record record) {
        Object object = this.lock;
        synchronized (object) {
            ProducerRecord<byte[], byte[]> producerRecord = this.toProducerRecord(record);
            LOGGER.debug("Sending the record '{}'", (Object)record.toString());
            Future future = this.producer.send(producerRecord);
            LOGGER.debug("The record '{}' has been sent", (Object)record.toString());
            this.futures.put(record, future);
            this.maybeFlushAndMarkOffset();
        }
    }

    private ProducerRecord<byte[], byte[]> toProducerRecord(Record record) {
        String topic = this.topicSelector.topicNameFor((DataCollectionId)record.getSource().keyspaceTable);
        byte[] serializedKey = this.keyConverter.fromConnectData(topic, record.getKeySchema(), (Object)record.buildKey());
        byte[] serializedValue = this.valueConverter.fromConnectData(topic, record.getValueSchema(), (Object)record.buildValue());
        return new ProducerRecord(topic, (Object)serializedKey, (Object)serializedValue);
    }

    private void maybeFlushAndMarkOffset() {
        long now = System.currentTimeMillis();
        long timeSinceLastFlush = now - this.timeOfLastFlush;
        if (this.offsetFlushPolicy.shouldFlush(Duration.ofMillis(timeSinceLastFlush), this.futures.size())) {
            this.flushAndMarkOffset();
            this.timeOfLastFlush = now;
        }
    }

    private void flushAndMarkOffset() {
        this.futures.entrySet().stream().filter(this::flush).filter(this::hasOffset).forEach(this::markOffset);
        this.offsetWriter.flush();
        this.futures.clear();
    }

    private boolean flush(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
        try {
            recordEntry.getValue().get();
            if (++this.emitCount % 10000L == 0L) {
                LOGGER.info("Emitted {} records to Kafka Broker", (Object)this.emitCount);
                this.emitCount = 0L;
            }
            return true;
        }
        catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Failed to emit record {}", (Object)recordEntry.getKey(), (Object)e);
            return false;
        }
    }

    private boolean hasOffset(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
        return recordEntry.getKey().shouldMarkOffset();
    }

    private void markOffset(Map.Entry<Record, Future<RecordMetadata>> recordEntry) {
        SourceInfo source = recordEntry.getKey().getSource();
        String sourceTable = source.keyspaceTable.name();
        String sourceOffset = source.offsetPosition.serialize();
        boolean isSnapshot = source.snapshot;
        this.offsetWriter.markOffset(sourceTable, sourceOffset, isSnapshot);
        if (isSnapshot) {
            LOGGER.info("Mark snapshot offset for table '{}'", (Object)sourceTable);
        }
    }

    @Override
    public void close() {
        this.producer.close();
    }
}

