/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RegisterForReflection(targets={KafkaReadStreamImpl.class})
public class HonoKafkaConsumer
implements Lifecycle {
    private static final long WAIT_FOR_REBALANCE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250L;
    private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Vertx vertx;
    protected final AtomicBoolean stopCalled = new AtomicBoolean();
    protected final Map<String, String> consumerConfig;
    protected final Set<String> topics;
    protected final Pattern topicPattern;
    private final AtomicReference<Pair<Promise<Void>, Promise<Void>>> subscriptionUpdatedAndPartitionsAssignedPromiseRef = new AtomicReference();
    private final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler;
    private final AtomicBoolean pollingPaused = new AtomicBoolean();
    private final AtomicBoolean recordFetchingPaused = new AtomicBoolean();
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private volatile Set<String> subscribedTopicPatternTopics = new HashSet<String>();
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private Handler<Set<TopicPartition>> onPartitionsLostHandler;
    private boolean respectTtl = true;
    private Duration pollTimeout = Duration.ofMillis(250L);
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private KafkaClientMetricsSupport metricsSupport;
    private Long pollPauseTimeoutTimerId;
    private Duration consumerCreationRetriesTimeout = Duration.ZERO;

    public HonoKafkaConsumer(Vertx vertx, Set<String> topics, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, Objects.requireNonNull(topics), null, recordHandler, consumerConfig);
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, Objects.requireNonNull(topicPattern), recordHandler, consumerConfig);
    }

    protected HonoKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Handler<KafkaConsumerRecord<String, Buffer>> recordHandler, Map<String, String> consumerConfig) {
        this.vertx = Objects.requireNonNull(vertx);
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.recordHandler = Objects.requireNonNull(recordHandler);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
        if (topics == null == (topicPattern == null)) {
            throw new NullPointerException("either topics or topicPattern has to be set");
        }
        if (!consumerConfig.containsKey("group.id")) {
            if ("true".equals(consumerConfig.get("enable.auto.commit"))) {
                throw new IllegalArgumentException("group.id config entry has to be set if auto-commit is enabled");
            }
            this.log.trace("no group.id set, using a random UUID as default and disabling auto-commit");
            consumerConfig.put("group.id", UUID.randomUUID().toString());
            consumerConfig.put("enable.auto.commit", "false");
        }
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> onPartitionsAssignedHandler) {
        this.onPartitionsAssignedHandler = Objects.requireNonNull(onPartitionsAssignedHandler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> onPartitionsRevokedHandler) {
        this.onPartitionsRevokedHandler = Objects.requireNonNull(onPartitionsRevokedHandler);
    }

    public final void setOnPartitionsLostHandler(Handler<Set<TopicPartition>> onPartitionsLostHandler) {
        this.onPartitionsLostHandler = Objects.requireNonNull(onPartitionsLostHandler);
    }

    public final void setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
    }

    public final void setConsumerCreationRetriesTimeout(Duration consumerCreationRetriesTimeout) {
        this.consumerCreationRetriesTimeout = consumerCreationRetriesTimeout;
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    public final void setPollTimeout(Duration pollTimeout) {
        this.pollTimeout = Objects.requireNonNull(pollTimeout);
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.asStream().pollTimeout(pollTimeout);
        }
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    public final boolean pauseRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v -> {
            Set partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().pause((Collection)partitions);
            }
        }));
        return true;
    }

    public final boolean resumeRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(true, false)) {
            return false;
        }
        this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v -> {
            Set partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().resume((Collection)partitions);
            }
        }));
        return true;
    }

    public final boolean isRecordFetchingPaused() {
        return this.recordFetchingPaused.get();
    }

    public final boolean pauseRecordHandlingAndPolling(Duration timeout) {
        if (!this.pollingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.pollPauseTimeoutTimerId = this.vertx.setTimer(timeout.toMillis(), tid -> {
            this.pollPauseTimeoutTimerId = null;
            if (this.resumeRecordHandlingAndPolling()) {
                this.log.debug("resumed consumer record polling - timeout of {}ms was reached [client-id: {}]", (Object)timeout.toMillis(), (Object)this.getClientId());
            }
        });
        this.getKafkaConsumer().pause();
        return true;
    }

    public final boolean resumeRecordHandlingAndPolling() {
        if (!this.pollingPaused.compareAndSet(true, false)) {
            return false;
        }
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId.longValue());
            this.pollPauseTimeoutTimerId = null;
        }
        this.getKafkaConsumer().resume();
        return true;
    }

    public final boolean isRecordHandlingAndPollingPaused() {
        return this.pollingPaused.get();
    }

    protected final KafkaConsumer<String, Buffer> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer;
    }

    protected final Consumer<String, Buffer> getUnderlyingConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer.asStream().unwrap();
    }

    protected final String getClientId() {
        return this.consumerConfig.get("client.id");
    }

    public Future<Void> start() {
        this.context = this.vertx.getOrCreateContext();
        Promise startPromise = Promise.promise();
        this.runOnContext((Handler<Void>)((Handler)v -> Optional.ofNullable(this.kafkaConsumerSupplier).map(supplier -> Future.succeededFuture((Object)KafkaConsumer.create((Vertx)this.vertx, (Consumer)((Consumer)supplier.get())))).orElseGet(() -> {
            KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(this.vertx);
            return kafkaClientFactory.createKafkaConsumerWithRetries(this.consumerConfig, String.class, Buffer.class, this.consumerCreationRetriesTimeout);
        }).onFailure(thr -> {
            this.log.error("error creating consumer [client-id: {}]", (Object)this.getClientId(), thr);
            startPromise.fail(thr);
        }).onSuccess(consumer -> {
            this.kafkaConsumer = consumer;
            Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(this.kafkaConsumer.unwrap()));
            this.kafkaConsumer.handler(record -> {
                if (!startPromise.future().isComplete()) {
                    this.log.debug("postponing record handling until start() is completed [topic: {}, partition: {}, offset: {}]", new Object[]{record.topic(), record.partition(), record.offset()});
                }
                startPromise.future().onSuccess(v2 -> {
                    if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(record.headers())) {
                        this.onRecordHandlerSkippedForExpiredRecord((KafkaConsumerRecord<String, Buffer>)record);
                    } else {
                        try {
                            this.recordHandler.handle(record);
                        }
                        catch (Exception e) {
                            this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{record.topic(), record.partition(), record.offset(), record.headers(), e});
                        }
                    }
                });
            });
            this.kafkaConsumer.batchHandler(this::onBatchOfRecordsReceived);
            this.kafkaConsumer.exceptionHandler(error -> this.log.error("consumer error occurred [client-id: {}]", (Object)this.getClientId(), error));
            this.installRebalanceListeners();
            this.kafkaConsumer.asStream().pollTimeout(Duration.ofMillis(10L));
            this.subscribeAndWaitForRebalance().onSuccess(v2 -> {
                this.kafkaConsumer.asStream().pollTimeout(this.pollTimeout);
                this.logSubscribedTopicsOnStartComplete();
            }).onComplete((Handler)startPromise);
        })));
        return startPromise.future();
    }

    private void logSubscribedTopicsOnStartComplete() {
        if (this.topicPattern != null) {
            if (this.subscribedTopicPatternTopics.size() <= 5) {
                this.log.debug("consumer started, subscribed to topic pattern [{}], matching topics: {}", (Object)this.topicPattern, this.subscribedTopicPatternTopics);
            } else {
                this.log.debug("consumer started, subscribed to topic pattern [{}], matching {} topics", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics.size());
            }
        } else {
            this.log.debug("consumer started, subscribed to topics {}", this.topics);
        }
    }

    protected void onBatchOfRecordsReceived(KafkaConsumerRecords<String, Buffer> records) {
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> record) {
    }

    private void installRebalanceListeners() {
        HonoKafkaConsumer.replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener(){

            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions assigned: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.ensurePositionsHaveBeenSetIfNeeded(partitionsSet);
                HonoKafkaConsumer.this.updateSubscribedTopicPatternTopicsAndRemoveMetrics();
                if (HonoKafkaConsumer.this.recordFetchingPaused.get()) {
                    HonoKafkaConsumer.this.getUnderlyingConsumer().pause(partitions);
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(partitionsSet);
                Set allAssignedPartitions = Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(h -> Helper.from((Collection)HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment())).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(v -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(partitionsSet);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle((Object)allAssignedPartitions);
                    }
                });
            }

            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions revoked: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }

            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set partitionsSet = Helper.from(partitions);
                if (HonoKafkaConsumer.this.log.isInfoEnabled()) {
                    HonoKafkaConsumer.this.log.info("partitions lost: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), (Object)HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.onPartitionsLostBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsLost(partitionsSet));
            }
        });
    }

    private void ensurePositionsHaveBeenSetIfNeeded(Set<TopicPartition> assignedPartitions) {
        if (!assignedPartitions.isEmpty() && this.isAutoOffsetResetConfigLatest()) {
            this.log.trace("checking positions for {} newly assigned partitions...", (Object)assignedPartitions.size());
            Set partitions = Helper.to(assignedPartitions);
            try {
                LinkedList<org.apache.kafka.common.TopicPartition> outOfRangeOffsetPartitions = new LinkedList<org.apache.kafka.common.TopicPartition>();
                Map beginningOffsets = this.getUnderlyingConsumer().beginningOffsets((Collection)partitions);
                partitions.forEach(partition -> {
                    long position = this.getUnderlyingConsumer().position(partition);
                    Long beginningOffset = (Long)beginningOffsets.get(partition);
                    if (beginningOffset != null && position < beginningOffset) {
                        this.log.debug("committed offset {} for [{}] is smaller than beginning offset, resetting it to the beginning offset {}", new Object[]{position, partition, beginningOffset});
                        this.getUnderlyingConsumer().seek(partition, beginningOffset.longValue());
                        outOfRangeOffsetPartitions.add((org.apache.kafka.common.TopicPartition)partition);
                    }
                });
                if (!outOfRangeOffsetPartitions.isEmpty()) {
                    this.log.info("found out-of-range committed offsets, corresponding records having already been deleted; positions were reset to beginning offsets; partitions: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(outOfRangeOffsetPartitions), (Object)this.getClientId());
                }
            }
            catch (Exception e) {
                this.log.error("error checking positions for {} newly assigned partitions [client-id: {}]", new Object[]{assignedPartitions.size(), this.getClientId(), e});
            }
            this.log.trace("done checking positions for {} newly assigned partitions", (Object)assignedPartitions.size());
        }
    }

    protected final boolean isCooperativeRebalancingConfigured() {
        return Optional.ofNullable(this.consumerConfig.get("partition.assignment.strategy")).map(value -> value.equals(CooperativeStickyAssignor.class.getName())).orElse(false);
    }

    protected final boolean isAutoOffsetResetConfigLatest() {
        return Optional.ofNullable(this.consumerConfig.get("auto.offset.reset")).map(value -> value.equals("latest")).orElse(true);
    }

    private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
        if (this.topicPattern != null) {
            Set<String> oldSubscribedTopicPatternTopics = this.subscribedTopicPatternTopics;
            try {
                this.subscribedTopicPatternTopics = new HashSet<String>(this.getUnderlyingConsumer().subscription());
            }
            catch (Exception e) {
                this.log.warn("error getting subscription", (Throwable)e);
            }
            Set deletedTopics = oldSubscribedTopicPatternTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t)).collect(Collectors.toSet());
            if (!deletedTopics.isEmpty()) {
                this.runOnContext((Handler<Void>)((Handler)v -> this.vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v2 -> this.removeMetricsForDeletedTopics(deletedTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t))))))));
            }
        }
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        Promise partitionAssignmentDone = Promise.promise();
        Promise subscriptionUpdated = Promise.promise();
        Pair newPromisePair = Pair.of((Object)subscriptionUpdated, (Object)partitionAssignmentDone);
        Pair<Promise<Void>, Promise<Void>> promisePair = this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.updateAndGet(promise -> promise == null ? newPromisePair : promise);
        if (!promisePair.equals((Object)newPromisePair)) {
            this.log.debug("subscribeAndWaitForRebalance: will wait for ongoing invocation to complete");
            return CompositeFuture.all((Future)((Promise)promisePair.one()).future(), (Future)((Promise)promisePair.two()).future()).mapEmpty();
        }
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, (Handler)subscriptionUpdated);
        } else {
            this.topics.forEach(topic -> HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic).onSuccess(partitions -> {
                if (partitions.isEmpty()) {
                    this.log.info("subscription topic doesn't exist and didn't get auto-created: {} [client-id: {}]", topic, (Object)this.getClientId());
                }
            }));
            this.kafkaConsumer.subscribe(this.topics, (Handler)subscriptionUpdated);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = HonoKafkaConsumer.getKafkaConsumerWorker(this.kafkaConsumer);
        }
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT_MILLIS, ar -> {
            if (!partitionAssignmentDone.future().isComplete()) {
                this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(promisePair, null);
                String errorMsg = "timed out waiting for rebalance and update of subscribed topics";
                this.log.warn("timed out waiting for rebalance and update of subscribed topics");
                partitionAssignmentDone.tryFail((Throwable)new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
            }
        });
        subscriptionUpdated.future().onFailure(thr -> this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(promisePair, null));
        return CompositeFuture.all((Future)subscriptionUpdated.future(), (Future)partitionAssignmentDone.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        Optional.ofNullable(this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.getAndSet(null)).ifPresent(promisePair -> ((Promise)promisePair.two()).tryComplete());
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(partitionsSet);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
    }

    protected void onPartitionsLostBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(partitionsSet);
        }
    }

    private void onPartitionsLost(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsLostHandler != null) {
            this.onPartitionsLostHandler.handle(partitionsSet);
        }
    }

    public Future<Void> stop() {
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId.longValue());
            this.pollPauseTimeoutTimerId = null;
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((String)"not started");
        }
        if (!this.stopCalled.compareAndSet(false, true)) {
            this.log.trace("stop already called");
            return Future.succeededFuture();
        }
        Promise consumerClosePromise = Promise.promise();
        this.kafkaConsumer.close((Handler)consumerClosePromise);
        return consumerClosePromise.future().onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.kafkaConsumer.unwrap())));
    }

    protected void runOnContext(Handler<Void> codeToRun) {
        Objects.requireNonNull(codeToRun);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(go -> codeToRun.handle(null));
        } else {
            codeToRun.handle(null);
        }
    }

    protected void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        if (!this.stopCalled.get()) {
            this.kafkaConsumerWorker.submit(() -> {
                if (!this.stopCalled.get()) {
                    try {
                        handler.handle(null);
                    }
                    catch (Exception ex) {
                        this.log.error("error running task on Kafka worker thread [client-id: {}]", (Object)this.getClientId(), (Object)ex);
                    }
                }
            });
        }
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        if (this.topicPattern == null) {
            return Set.of();
        }
        return new HashSet<String>(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            return this.topics.contains(topic);
        }
        return this.subscribedTopicPatternTopics.contains(topic);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(topic).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((Throwable)new ServerErrorException(500, "not started"));
        }
        if (this.stopCalled.get()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "already stopped"));
        }
        if (this.subscribedTopicPatternTopics.contains(topic)) {
            this.log.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        HashSet<String> subscribedTopicPatternTopicsBefore = new HashSet<String>(this.subscribedTopicPatternTopics);
        Promise resultPromise = Promise.promise();
        Future topicCheckFuture = HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic).onFailure(thr -> this.log.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", (Object)topic, thr)).compose(partitions -> {
            if (partitions.isEmpty()) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", (Object)topic);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "command topic doesn't exist and didn't get auto-created"));
            }
            return Future.succeededFuture();
        }).onFailure(arg_0 -> ((Promise)resultPromise).tryFail(arg_0)).mapEmpty();
        this.log.debug("ensureTopicIsAmongSubscribedTopics: wait for subscription update and rebalance [{}]", (Object)topic);
        this.subscribeAndWaitForRebalance().compose(v -> {
            boolean someTopicDeleted = subscribedTopicPatternTopicsBefore.stream().anyMatch(t -> !this.subscribedTopicPatternTopics.contains(t));
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                this.log.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", (Object)topic);
                return this.subscribeAndWaitForRebalance();
            }
            if (this.isCooperativeRebalancingConfigured() && someTopicDeleted && this.isAutoOffsetResetConfigLatest()) {
                return this.kafkaConsumer.assignment().compose(partitions -> {
                    if (partitions.stream().anyMatch(p -> p.getTopic().equals(topic))) {
                        return Future.succeededFuture((Object)v);
                    }
                    this.log.debug("ensureTopicIsAmongSubscribedTopics: wait for another rebalance before considering update of topic subscription [{}] as done", (Object)topic);
                    Promise rebalanceResultPromise = Promise.promise();
                    this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v2 -> {
                        this.getUnderlyingConsumer().enforceRebalance();
                        this.runOnContext((Handler<Void>)((Handler)v3 -> this.subscribeAndWaitForRebalance().onComplete((Handler)rebalanceResultPromise)));
                    }));
                    return rebalanceResultPromise.future();
                });
            }
            return Future.succeededFuture((Object)v);
        }).compose(v -> {
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", (Object)topic);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "subscription not updated with topic after rebalance"));
            }
            this.log.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription [{}]", (Object)topic);
            return Future.succeededFuture((Object)v);
        }).onComplete(ar -> Futures.tryHandleResult((Promise)resultPromise, (AsyncResult)ar));
        if (!this.isAutoOffsetResetConfigLatest()) {
            topicCheckFuture.onSuccess(v -> resultPromise.tryComplete());
        }
        return resultPromise.future();
    }

    private void removeMetricsForDeletedTopics(Stream<String> deletedTopics) {
        Metrics metrics = this.getInternalMetricsObject((Consumer<String, Buffer>)this.kafkaConsumer.unwrap());
        if (metrics != null) {
            deletedTopics.forEach(topic -> {
                metrics.removeSensor("topic." + topic + ".bytes-fetched");
                metrics.removeSensor("topic." + topic + ".records-fetched");
            });
        }
    }

    private Metrics getInternalMetricsObject(Consumer<String, Buffer> consumer) {
        if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
            try {
                Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
                field.setAccessible(true);
                return (Metrics)field.get(consumer);
            }
            catch (Exception e) {
                this.log.warn("failed to get metrics object", (Throwable)e);
            }
        }
        return null;
    }

    private static void replaceRebalanceListener(KafkaConsumer<String, Buffer> consumer, ConsumerRebalanceListener listener) {
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            field.setAccessible(true);
            field.set(consumer.asStream(), listener);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private static ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, Buffer> consumer) {
        ExecutorService worker;
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("worker");
            field.setAccessible(true);
            worker = (ExecutorService)field.get(consumer.asStream());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
        if (worker == null) {
            throw new IllegalStateException("worker not set");
        }
        return worker;
    }
}

