/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HonoKafkaConsumer
implements Lifecycle {
    private static final long WAIT_FOR_REBALANCE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1L);
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Vertx vertx;
    protected final AtomicBoolean stopCalled = new AtomicBoolean();
    protected final Map<String, String> consumerConfig;
    protected final Set<String> topics;
    protected final Pattern topicPattern;
    private final AtomicReference<Promise<Void>> subscribeDonePromiseRef = new AtomicReference();
    private final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler;
    private final AtomicBoolean paused = new AtomicBoolean();
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private volatile Set<String> subscribedTopicPatternTopics = new HashSet<String>();
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private boolean respectTtl = true;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private KafkaClientMetricsSupport metricsSupport;

    public HonoKafkaConsumer(Vertx vertx, Set<String> topics, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, Objects.requireNonNull(topics), null, recordHandler, consumerConfig);
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, Objects.requireNonNull(topicPattern), recordHandler, consumerConfig);
    }

    protected HonoKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this.vertx = Objects.requireNonNull(vertx);
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.recordHandler = Objects.requireNonNull(recordHandler);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
        if (topics == null == (topicPattern == null)) {
            throw new NullPointerException("either topics or topicPattern has to be set");
        }
        if (!consumerConfig.containsKey("group.id")) {
            if ("true".equals(consumerConfig.get("enable.auto.commit"))) {
                throw new IllegalArgumentException("group.id config entry has to be set if auto-commit is enabled");
            }
            this.log.trace("no group.id set, using a random UUID as default and disabling auto-commit");
            consumerConfig.put("group.id", UUID.randomUUID().toString());
            consumerConfig.put("enable.auto.commit", "false");
        }
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> onPartitionsAssignedHandler) {
        this.onPartitionsAssignedHandler = Objects.requireNonNull(onPartitionsAssignedHandler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> onPartitionsRevokedHandler) {
        this.onPartitionsRevokedHandler = Objects.requireNonNull(onPartitionsRevokedHandler);
    }

    public final void setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    public final boolean pause() {
        if (!this.paused.compareAndSet(false, true)) {
            return false;
        }
        this.getKafkaConsumer().pause();
        return true;
    }

    public final boolean resume() {
        if (!this.paused.compareAndSet(true, false)) {
            return false;
        }
        this.getKafkaConsumer().resume();
        return true;
    }

    public final boolean isPaused() {
        return this.paused.get();
    }

    protected final KafkaConsumer<String, Buffer> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        return this.kafkaConsumer;
    }

    protected final Consumer<String, Buffer> getUnderlyingConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        return this.kafkaConsumer.asStream().unwrap();
    }

    public Future<Void> start() {
        this.context = this.vertx.getOrCreateContext();
        Promise startPromise = Promise.promise();
        this.runOnContext((Handler<Void>)((Handler)v -> {
            this.kafkaConsumer = Optional.ofNullable(this.kafkaConsumerSupplier).map(supplier -> KafkaConsumer.create((Vertx)this.vertx, (Consumer)((Consumer)supplier.get()))).orElseGet(() -> KafkaConsumer.create((Vertx)this.vertx, this.consumerConfig, String.class, Buffer.class));
            Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(this.kafkaConsumer.unwrap()));
            this.kafkaConsumer.handler(record -> {
                if (!startPromise.future().isComplete()) {
                    this.log.debug("postponing record handling until start() is completed [topic: {}, partition: {}, offset: {}]", new Object[]{record.topic(), record.partition(), record.offset()});
                }
                startPromise.future().onSuccess(v2 -> {
                    if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(record.headers())) {
                        this.onRecordHandlerSkippedForExpiredRecord((KafkaConsumerRecord<String, Buffer>)record);
                    } else {
                        try {
                            this.recordHandler.handle(record);
                        }
                        catch (Exception e) {
                            this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{record.topic(), record.partition(), record.offset(), record.headers(), e});
                        }
                    }
                });
            });
            this.kafkaConsumer.exceptionHandler(error -> this.log.error("consumer error occurred [client.id: {}]", (Object)this.consumerConfig.get("client.id"), error));
            this.installRebalanceListeners();
            this.kafkaConsumer.asStream().pollTimeout(Duration.ofMillis(10L));
            this.subscribeAndWaitForRebalance().onSuccess(v2 -> {
                this.kafkaConsumer.asStream().pollTimeout(POLL_TIMEOUT);
                this.logSubscribedTopicsOnStartComplete();
            }).onComplete((Handler)startPromise);
        }));
        return startPromise.future();
    }

    private void logSubscribedTopicsOnStartComplete() {
        if (this.topicPattern != null) {
            if (this.subscribedTopicPatternTopics.size() <= 5) {
                this.log.debug("consumer started, subscribed to topic pattern [{}], matching topics: {}", (Object)this.topicPattern, this.subscribedTopicPatternTopics);
            } else {
                this.log.debug("consumer started, subscribed to topic pattern [{}], matching {} topics", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics.size());
            }
        } else {
            this.log.debug("consumer started, subscribed to topics {}", this.topics);
        }
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> record) {
    }

    private void installRebalanceListeners() {
        HonoKafkaConsumer.replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener(){

            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions assigned: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.ensurePositionsHaveBeenSetIfNeeded(partitionsSet, HonoKafkaConsumer.this.subscribedTopicPatternTopics);
                HonoKafkaConsumer.this.updateSubscribedTopicPatternTopics();
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(partitionsSet);
                Set allAssignedPartitions = Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(h -> Helper.from((Collection)HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment())).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(v -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(partitionsSet);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle((Object)allAssignedPartitions);
                    }
                });
            }

            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions revoked: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }

            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isInfoEnabled()) {
                    HonoKafkaConsumer.this.log.info("partitions lost: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }
        });
    }

    private void ensurePositionsHaveBeenSetIfNeeded(Set<TopicPartition> assignedPartitions, Set<String> knownTopicsFromBeforeRebalance) {
        Set<TopicPartition> newPartitions;
        if (!"earliest".equals(this.consumerConfig.get("auto.offset.reset")) && !(newPartitions = assignedPartitions.stream().filter(tp -> !knownTopicsFromBeforeRebalance.contains(tp.getTopic())).collect(Collectors.toSet())).isEmpty()) {
            try {
                this.log.trace("fetching positions for {} out of {} newly assigned partitions...", (Object)newPartitions.size(), (Object)assignedPartitions.size());
                newPartitions.forEach(partition -> this.getUnderlyingConsumer().position(Helper.to((TopicPartition)partition)));
                this.log.trace("done fetching positions for {} out of {} newly assigned partitions", (Object)newPartitions.size(), (Object)assignedPartitions.size());
            }
            catch (Exception e) {
                this.log.error("error fetching positions for {} out of {} newly assigned partitions", new Object[]{newPartitions.size(), assignedPartitions.size(), e});
            }
        }
    }

    private void updateSubscribedTopicPatternTopics() {
        if (this.topicPattern != null) {
            try {
                this.subscribedTopicPatternTopics = new HashSet<String>(this.getUnderlyingConsumer().subscription());
            }
            catch (Exception e) {
                this.log.warn("error getting subscription", (Throwable)e);
            }
        }
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        Promise<Void> subscribeDonePromise = this.subscribeDonePromiseRef.updateAndGet(promise -> promise == null ? Promise.promise() : promise);
        Promise subscriptionPromise = Promise.promise();
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, (Handler)subscriptionPromise);
        } else {
            this.topics.forEach(topic -> {
                Promise partitionsForFuture = Promise.promise();
                partitionsForFuture.future().onSuccess(partitions -> {
                    if (partitions.isEmpty()) {
                        this.log.info("subscription topic doesn't exist and didn't get auto-created: {}", topic);
                    }
                });
                HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic, (Handler<AsyncResult<List<PartitionInfo>>>)partitionsForFuture);
            });
            this.kafkaConsumer.subscribe(this.topics, (Handler)subscriptionPromise);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = HonoKafkaConsumer.getKafkaConsumerWorker(this.kafkaConsumer);
        }
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT, ar -> {
            if (!subscribeDonePromise.future().isComplete()) {
                String errorMsg = "timed out waiting for rebalance and update of subscribed topics";
                this.log.warn("timed out waiting for rebalance and update of subscribed topics");
                subscribeDonePromise.tryFail((Throwable)new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
            }
        });
        return CompositeFuture.all((Future)subscriptionPromise.future(), (Future)subscribeDonePromise.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        Optional.ofNullable(this.subscribeDonePromiseRef.getAndSet(null)).ifPresent(Promise::tryComplete);
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(partitionsSet);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(partitionsSet);
        }
    }

    public Future<Void> stop() {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((String)"not started");
        }
        if (!this.stopCalled.compareAndSet(false, true)) {
            this.log.trace("stop already called");
            return Future.succeededFuture();
        }
        Promise consumerClosePromise = Promise.promise();
        this.kafkaConsumer.close((Handler)consumerClosePromise);
        return consumerClosePromise.future().onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.kafkaConsumer.unwrap())));
    }

    protected void runOnContext(Handler<Void> codeToRun) {
        Objects.requireNonNull(codeToRun);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(go -> codeToRun.handle(null));
        } else {
            codeToRun.handle(null);
        }
    }

    protected void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        if (!this.stopCalled.get()) {
            this.kafkaConsumerWorker.submit(() -> {
                if (!this.stopCalled.get()) {
                    try {
                        handler.handle(null);
                    }
                    catch (Exception ex) {
                        this.log.error("error running task on Kafka worker thread", (Throwable)ex);
                    }
                }
            });
        }
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        if (this.topicPattern == null) {
            return Set.of();
        }
        return new HashSet<String>(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            return this.topics.contains(topic);
        }
        return this.subscribedTopicPatternTopics.contains(topic);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(topic).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((Throwable)new ServerErrorException(500, "not started"));
        }
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        if (this.subscribedTopicPatternTopics.contains(topic)) {
            this.log.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        Promise topicCheckFuture = Promise.promise();
        HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic, (Handler<AsyncResult<List<PartitionInfo>>>)topicCheckFuture);
        topicCheckFuture.future().onFailure(thr -> this.log.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", (Object)topic, thr)).onSuccess(partitions -> {
            if (partitions.isEmpty()) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", (Object)topic);
            }
        });
        this.log.debug("ensureTopicIsAmongSubscribedTopics: wait for subscription update and rebalance [{}]", (Object)topic);
        return this.subscribeAndWaitForRebalance().compose(v -> {
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                this.log.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", (Object)topic);
                return this.subscribeAndWaitForRebalance();
            }
            return Future.succeededFuture((Object)v);
        }).compose(v -> {
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", (Object)topic);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "subscription not updated with topic after rebalance"));
            }
            this.log.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription [{}]", (Object)topic);
            return Future.succeededFuture((Object)v);
        });
    }

    private static void replaceRebalanceListener(KafkaConsumer<String, Buffer> consumer, ConsumerRebalanceListener listener) {
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            field.setAccessible(true);
            field.set(consumer.asStream(), listener);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private static ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, Buffer> consumer) {
        ExecutorService worker;
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("worker");
            field.setAccessible(true);
            worker = (ExecutorService)field.get(consumer.asStream());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
        if (worker == null) {
            throw new IllegalStateException("worker not set");
        }
        return worker;
    }
}

