/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.util;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedLog<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000L;
    private Time time;
    private final String topic;
    private final Map<String, Object> producerConfigs;
    private final Map<String, Object> consumerConfigs;
    private final Callback<ConsumerRecord<K, V>> consumedCallback;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private Thread thread;
    private boolean stopRequested;
    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
    private Runnable initializer;

    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) {
        this.topic = topic;
        this.producerConfigs = producerConfigs;
        this.consumerConfigs = consumerConfigs;
        this.consumedCallback = consumedCallback;
        this.stopRequested = false;
        this.readLogEndOffsetCallbacks = new ArrayDeque<Callback<Void>>();
        this.time = time;
        this.initializer = initializer != null ? initializer : new Runnable(){

            @Override
            public void run() {
            }
        };
    }

    public void start() {
        log.info("Starting KafkaBasedLog with topic " + this.topic);
        this.initializer.run();
        this.producer = this.createProducer();
        this.consumer = this.createConsumer();
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        List partitionInfos = null;
        long started = this.time.milliseconds();
        while (partitionInfos == null && this.time.milliseconds() - started < 30000L) {
            partitionInfos = this.consumer.partitionsFor(this.topic);
            Utils.sleep((long)Math.min(this.time.milliseconds() - started, 1000L));
        }
        if (partitionInfos == null) {
            throw new ConnectException("Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
        }
        for (PartitionInfo partition : partitionInfos) {
            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        this.consumer.assign(partitions);
        this.readToLogEnd();
        this.thread = new WorkThread();
        this.thread.start();
        log.info("Finished reading KafkaBasedLog for topic " + this.topic);
        log.info("Started KafkaBasedLog for topic " + this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        log.info("Stopping KafkaBasedLog for topic " + this.topic);
        KafkaBasedLog kafkaBasedLog = this;
        synchronized (kafkaBasedLog) {
            this.stopRequested = true;
        }
        this.consumer.wakeup();
        try {
            this.thread.join();
        }
        catch (InterruptedException e) {
            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting down it's producer and consumer.", (Throwable)e);
        }
        try {
            this.producer.close();
        }
        catch (KafkaException e) {
            log.error("Failed to stop KafkaBasedLog producer", (Throwable)e);
        }
        try {
            this.consumer.close();
        }
        catch (KafkaException e) {
            log.error("Failed to stop KafkaBasedLog consumer", (Throwable)e);
        }
        log.info("Stopped KafkaBasedLog for topic " + this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readToEnd(Callback<Void> callback) {
        log.trace("Starting read to end log for topic {}", (Object)this.topic);
        this.producer.flush();
        KafkaBasedLog kafkaBasedLog = this;
        synchronized (kafkaBasedLog) {
            this.readLogEndOffsetCallbacks.add(callback);
        }
        this.consumer.wakeup();
    }

    public void flush() {
        this.producer.flush();
    }

    public Future<Void> readToEnd() {
        FutureCallback<Void> future = new FutureCallback<Void>(null);
        this.readToEnd(future);
        return future;
    }

    public void send(K key, V value) {
        this.send(key, value, null);
    }

    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
        this.producer.send(new ProducerRecord(this.topic, key, value), callback);
    }

    private Producer<K, V> createProducer() {
        this.producerConfigs.put("acks", "all");
        this.producerConfigs.put("max.in.flight.requests.per.connection", 1);
        return new KafkaProducer(this.producerConfigs);
    }

    private Consumer<K, V> createConsumer() {
        this.consumerConfigs.put("auto.offset.reset", "earliest");
        this.consumerConfigs.put("enable.auto.commit", false);
        return new KafkaConsumer(this.consumerConfigs);
    }

    private void poll(long timeoutMs) {
        try {
            ConsumerRecords records = this.consumer.poll(timeoutMs);
            for (ConsumerRecord record : records) {
                this.consumedCallback.onCompletion(null, record);
            }
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (KafkaException e) {
            log.error("Error polling: " + (Object)((Object)e));
        }
    }

    private void readToLogEnd() {
        log.trace("Reading to end of offset log");
        Set assignment = this.consumer.assignment();
        Map endOffsets = this.consumer.endOffsets((Collection)assignment);
        log.trace("Reading to end of log offsets {}", (Object)endOffsets);
        block0: while (!endOffsets.isEmpty()) {
            Iterator it = endOffsets.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = it.next();
                if (this.consumer.position((TopicPartition)entry.getKey()) >= (Long)entry.getValue()) {
                    it.remove();
                    continue;
                }
                this.poll(Integer.MAX_VALUE);
                continue block0;
            }
        }
    }

    private class WorkThread
    extends Thread {
        public WorkThread() {
            super("KafkaBasedLog Work Thread - " + KafkaBasedLog.this.topic);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                log.trace("{} started execution", (Object)this);
                while (true) {
                    int numCallbacks;
                    KafkaBasedLog kafkaBasedLog = KafkaBasedLog.this;
                    synchronized (kafkaBasedLog) {
                        if (KafkaBasedLog.this.stopRequested) {
                            break;
                        }
                        numCallbacks = KafkaBasedLog.this.readLogEndOffsetCallbacks.size();
                    }
                    if (numCallbacks > 0) {
                        try {
                            KafkaBasedLog.this.readToLogEnd();
                            log.trace("Finished read to end log for topic {}", (Object)KafkaBasedLog.this.topic);
                        }
                        catch (WakeupException e) {
                            continue;
                        }
                    }
                    KafkaBasedLog e = KafkaBasedLog.this;
                    synchronized (e) {
                        for (int i = 0; i < numCallbacks; ++i) {
                            Callback cb = (Callback)KafkaBasedLog.this.readLogEndOffsetCallbacks.poll();
                            cb.onCompletion(null, null);
                        }
                    }
                    try {
                        KafkaBasedLog.this.poll(Integer.MAX_VALUE);
                    }
                    catch (WakeupException e2) {}
                }
            }
            catch (Throwable t) {
                log.error("Unexpected exception in {}", (Object)this, (Object)t);
            }
        }
    }
}

