/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HonoKafkaConsumer
implements Lifecycle {
    private static final long WAIT_FOR_REBALANCE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    private static final String DEFAULT_MAX_POLL_INTERNAL_MS = Long.toString(TimeUnit.SECONDS.toMillis(20L));
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Vertx vertx;
    protected final AtomicBoolean stopCalled = new AtomicBoolean();
    private final Map<String, String> consumerConfig;
    private final AtomicReference<Promise<Void>> subscribeDonePromiseRef = new AtomicReference();
    private final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private Set<String> subscribedTopicPatternTopics = new HashSet<String>();
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private boolean respectTtl = true;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;

    public HonoKafkaConsumer(Vertx vertx, Set<String> topics, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, Objects.requireNonNull(topics), null, recordHandler, consumerConfig);
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, Objects.requireNonNull(topicPattern), recordHandler, consumerConfig);
    }

    protected HonoKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this.vertx = Objects.requireNonNull(vertx);
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.recordHandler = Objects.requireNonNull(recordHandler);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
        if (topics == null == (topicPattern == null)) {
            throw new NullPointerException("either topics or topicPattern has to be set");
        }
        if (!consumerConfig.containsKey("max.poll.interval.ms")) {
            consumerConfig.put("max.poll.interval.ms", DEFAULT_MAX_POLL_INTERNAL_MS);
        }
        if (!consumerConfig.containsKey("group.id")) {
            if ("true".equals(consumerConfig.get("enable.auto.commit"))) {
                throw new IllegalArgumentException("group.id config entry has to be set if auto-commit is enabled");
            }
            this.log.trace("no group.id set, using a random UUID as default and disabling auto-commit");
            consumerConfig.put("group.id", UUID.randomUUID().toString());
            consumerConfig.put("enable.auto.commit", "false");
        }
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> onPartitionsAssignedHandler) {
        this.onPartitionsAssignedHandler = Objects.requireNonNull(onPartitionsAssignedHandler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> onPartitionsRevokedHandler) {
        this.onPartitionsRevokedHandler = Objects.requireNonNull(onPartitionsRevokedHandler);
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    protected final KafkaConsumer<String, Buffer> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        return this.kafkaConsumer;
    }

    public Future<Void> start() {
        this.context = this.vertx.getOrCreateContext();
        Promise startPromise = Promise.promise();
        this.runOnContext((Handler<Void>)((Handler)v -> {
            this.kafkaConsumer = Optional.ofNullable(this.kafkaConsumerSupplier).map(supplier -> KafkaConsumer.create((Vertx)this.vertx, (Consumer)((Consumer)supplier.get()))).orElseGet(() -> KafkaConsumer.create((Vertx)this.vertx, this.consumerConfig, String.class, Buffer.class));
            this.kafkaConsumer.handler(record -> {
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(record.headers())) {
                    this.onRecordHandlerSkippedForExpiredRecord((KafkaConsumerRecord<String, Buffer>)record);
                } else {
                    try {
                        this.recordHandler.handle(record);
                    }
                    catch (Exception e) {
                        this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{record.topic(), record.partition(), record.offset(), record.headers(), e});
                    }
                }
            });
            this.kafkaConsumer.exceptionHandler(error -> this.log.error("consumer error occurred", error));
            this.installRebalanceListeners();
            this.subscribeAndWaitForRebalance().map(ok -> {
                if (this.topicPattern != null) {
                    if (this.subscribedTopicPatternTopics.size() <= 5) {
                        this.log.debug("subscribed to topic pattern [{}], matching topics: {}", (Object)this.topicPattern, this.subscribedTopicPatternTopics);
                    } else {
                        this.log.debug("subscribed to topic pattern [{}], matching {} topics", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics.size());
                    }
                } else {
                    this.log.debug("subscribed to topics {}", this.topics);
                }
                return null;
            }).onComplete((Handler)startPromise.future());
        }));
        return startPromise.future();
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> record) {
    }

    private void installRebalanceListeners() {
        HonoKafkaConsumer.replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener(){

            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions assigned: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(partitionsSet);
                Set allAssignedPartitions = Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(h -> Helper.from((Collection)HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment())).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(v -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(partitionsSet);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle((Object)allAssignedPartitions);
                    }
                });
            }

            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions revoked: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }

            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isInfoEnabled()) {
                    HonoKafkaConsumer.this.log.info("partitions lost: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }
        });
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        Promise<Void> subscribeDonePromise = this.subscribeDonePromiseRef.updateAndGet(promise -> promise == null ? Promise.promise() : promise);
        Promise subscriptionPromise = Promise.promise();
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, (Handler)subscriptionPromise);
        } else {
            this.kafkaConsumer.subscribe(this.topics, (Handler)subscriptionPromise);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = HonoKafkaConsumer.getKafkaConsumerWorker(this.kafkaConsumer);
        }
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT, ar -> {
            if (!subscribeDonePromise.future().isComplete()) {
                String errorMsg = "timed out waiting for rebalance and update of subscribed topics";
                this.log.warn("timed out waiting for rebalance and update of subscribed topics");
                subscribeDonePromise.tryFail((Throwable)new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
            }
        });
        return CompositeFuture.all((Future)subscriptionPromise.future(), (Future)subscribeDonePromise.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        if (this.topicPattern != null) {
            Promise subscriptionResultPromise = Promise.promise();
            this.kafkaConsumer.subscription((Handler)subscriptionResultPromise);
            subscriptionResultPromise.future().onSuccess(result -> {
                this.subscribedTopicPatternTopics = new HashSet<String>((Collection<String>)result);
            }).onFailure(thr -> this.log.info("failed to get subscription", thr)).map((Object)null).onComplete(ar -> Optional.ofNullable(this.subscribeDonePromiseRef.getAndSet(null)).ifPresent(promise -> HonoKafkaConsumer.tryCompletePromise(promise, ar)));
        } else {
            Optional.ofNullable(this.subscribeDonePromiseRef.getAndSet(null)).ifPresent(Promise::tryComplete);
        }
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(partitionsSet);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(partitionsSet);
        }
    }

    public Future<Void> stop() {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((String)"not started");
        }
        if (!this.stopCalled.compareAndSet(false, true)) {
            this.log.trace("stop already called");
            return Future.succeededFuture();
        }
        Promise consumerClosePromise = Promise.promise();
        this.kafkaConsumer.close((Handler)consumerClosePromise);
        return consumerClosePromise.future();
    }

    protected void runOnContext(Handler<Void> codeToRun) {
        Objects.requireNonNull(codeToRun);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(go -> codeToRun.handle(null));
        } else {
            codeToRun.handle(null);
        }
    }

    protected void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        if (!this.stopCalled.get()) {
            this.kafkaConsumerWorker.submit(() -> {
                if (!this.stopCalled.get()) {
                    handler.handle(null);
                }
            });
        }
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        if (this.topicPattern == null) {
            return Set.of();
        }
        return new HashSet<String>(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            return this.topics.contains(topic);
        }
        return this.subscribedTopicPatternTopics.contains(topic);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(topic).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((Throwable)new ServerErrorException(500, "not started"));
        }
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        if (this.subscribedTopicPatternTopics.contains(topic)) {
            this.log.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        Promise topicCheckFuture = Promise.promise();
        HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic, (Handler<AsyncResult<List<PartitionInfo>>>)topicCheckFuture);
        return topicCheckFuture.future().recover(thr -> {
            this.log.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", (Object)topic, thr);
            return Future.failedFuture((Throwable)new ServerErrorException(503, "error getting topic partitions", thr));
        }).compose(partitions -> {
            if (partitions.isEmpty()) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", (Object)topic);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "topic doesn't exist and didn't get auto-created"));
            }
            if (this.subscribedTopicPatternTopics.contains(topic)) {
                return Future.succeededFuture();
            }
            this.log.debug("ensureTopicIsAmongSubscribedTopics: verified topic existence, wait for subscription update and rebalance [{}]", (Object)topic);
            return this.subscribeAndWaitForRebalance().compose(v -> {
                if (!this.subscribedTopicPatternTopics.contains(topic)) {
                    this.log.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", (Object)topic);
                    return this.subscribeAndWaitForRebalance();
                }
                return Future.succeededFuture((Object)v);
            }).compose(v -> {
                if (!this.subscribedTopicPatternTopics.contains(topic)) {
                    this.log.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", (Object)topic);
                    return Future.failedFuture((Throwable)new ServerErrorException(503, "subscription not updated with topic after rebalance"));
                }
                this.log.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription");
                return Future.succeededFuture((Object)v);
            });
        });
    }

    private static <T> void tryCompletePromise(Promise<T> promise, AsyncResult<T> asyncResult) {
        if (asyncResult.succeeded()) {
            promise.tryComplete(asyncResult.result());
        } else {
            promise.tryFail(asyncResult.cause());
        }
    }

    private static void replaceRebalanceListener(KafkaConsumer<String, Buffer> consumer, ConsumerRebalanceListener listener) {
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            field.setAccessible(true);
            field.set(consumer.asStream(), listener);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private static ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, Buffer> consumer) {
        ExecutorService worker;
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("worker");
            field.setAccessible(true);
            worker = (ExecutorService)field.get(consumer.asStream());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
        if (worker == null) {
            throw new IllegalStateException("worker not set");
        }
        return worker;
    }
}

