/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;
import org.eclipse.hono.adapter.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaBasedMappingAndDelegatingCommandHandler;
import org.eclipse.hono.tracing.TracingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedCommandConsumerFactoryImpl
implements CommandConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedCommandConsumerFactoryImpl.class);
    private static final Pattern COMMANDS_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND.prefix) + ".*");
    private static final long WAIT_FOR_REBALANCE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    private final Supplier<KafkaConsumer<String, Buffer>> consumerCreator;
    private final KafkaBasedMappingAndDelegatingCommandHandler commandHandler;
    private final AtomicReference<Promise<Void>> onSubscribedTopicsNextUpdated = new AtomicReference();
    private final Tracer tracer;
    private final Vertx vertx;
    private Set<String> subscribedTopics = new HashSet<String>();
    private KafkaConsumer<String, Buffer> kafkaConsumer;

    public KafkaBasedCommandConsumerFactoryImpl(Vertx vertx, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, KafkaProducerConfigProperties kafkaProducerConfig, KafkaConsumerConfigProperties kafkaConsumerConfig, Tracer tracer) {
        this.vertx = Objects.requireNonNull(vertx);
        Objects.requireNonNull(tenantClient);
        Objects.requireNonNull(commandTargetMapper);
        Objects.requireNonNull(kafkaProducerFactory);
        Objects.requireNonNull(kafkaProducerConfig);
        Objects.requireNonNull(kafkaConsumerConfig);
        this.tracer = Objects.requireNonNull(tracer);
        KafkaBasedInternalCommandSender internalCommandSender = new KafkaBasedInternalCommandSender(kafkaProducerFactory, kafkaProducerConfig, tracer);
        this.commandHandler = new KafkaBasedMappingAndDelegatingCommandHandler(tenantClient, commandTargetMapper, internalCommandSender, tracer);
        Map consumerConfig = kafkaConsumerConfig.getConsumerConfig("cmd-router");
        consumerConfig.put("group.id", "cmd-router-group");
        this.consumerCreator = () -> KafkaConsumer.create((Vertx)vertx, (Map)consumerConfig, String.class, Buffer.class);
    }

    public Future<Void> start() {
        this.kafkaConsumer = this.consumerCreator.get();
        this.kafkaConsumer.handler(this.commandHandler::mapAndDelegateIncomingCommandMessage).partitionsAssignedHandler(this::onPartitionsAssigned).partitionsRevokedHandler(this::onPartitionsRevoked).exceptionHandler(error -> LOG.error("consumer error occurred", error));
        return CompositeFuture.all(this.commandHandler.start(), this.subscribeAndWaitForRebalanceAndTopicsUpdate()).map(ok -> {
            LOG.debug("subscribed to topic pattern [{}], matching {} topics", (Object)COMMANDS_TOPIC_PATTERN, (Object)this.subscribedTopics.size());
            return null;
        });
    }

    private Future<Void> subscribeAndWaitForRebalanceAndTopicsUpdate() {
        Promise<Void> subscribedTopicsPromise = this.onSubscribedTopicsNextUpdated.updateAndGet(promise -> promise == null ? Promise.promise() : promise);
        Promise subscriptionPromise = Promise.promise();
        this.kafkaConsumer.subscribe(COMMANDS_TOPIC_PATTERN, (Handler)subscriptionPromise);
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT, ar -> {
            if (!subscribedTopicsPromise.future().isComplete()) {
                String errorMsg = "timed out waiting for rebalance and update of subscribed topics";
                LOG.warn("timed out waiting for rebalance and update of subscribed topics");
                subscribedTopicsPromise.tryFail((Throwable)new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
            }
        });
        return CompositeFuture.all((Future)subscriptionPromise.future(), (Future)subscribedTopicsPromise.future()).mapEmpty();
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions assigned: [{}]", (Object)this.getPartitionsDebugString(partitionsSet));
        }
        this.kafkaConsumer.subscription(ar -> {
            if (ar.succeeded()) {
                this.subscribedTopics = new HashSet<String>((Collection)ar.result());
            } else {
                LOG.warn("failed to get subscription", ar.cause());
            }
            Optional.ofNullable(this.onSubscribedTopicsNextUpdated.getAndSet(null)).ifPresent(promise -> {
                if (ar.succeeded()) {
                    promise.tryComplete();
                } else {
                    promise.tryFail(ar.cause());
                }
            });
        });
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitions revoked: [{}]", (Object)this.getPartitionsDebugString(partitionsSet));
        }
    }

    private String getPartitionsDebugString(Set<TopicPartition> partitionsSet) {
        return partitionsSet.size() <= 20 ? partitionsSet.stream().collect(Collectors.groupingBy(TopicPartition::getTopic, Collectors.mapping(TopicPartition::getPartition, Collectors.toCollection(TreeSet::new)))).toString() : partitionsSet.size() + " topic partitions";
    }

    public Future<Void> stop() {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((String)"not started");
        }
        Promise consumerClosePromise = Promise.promise();
        this.kafkaConsumer.close((Handler)consumerClosePromise);
        return CompositeFuture.all(this.commandHandler.stop(), (Future)consumerClosePromise.future()).mapEmpty();
    }

    @Override
    public Future<Void> createCommandConsumer(String tenantId, SpanContext context) {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture((Throwable)new ServerErrorException(500, "not started"));
        }
        String topic = new HonoTopic(HonoTopic.Type.COMMAND, tenantId).toString();
        if (this.subscribedTopics.contains(topic)) {
            LOG.debug("createCommandConsumer: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        LOG.debug("createCommandConsumer: topic not subscribed; check for its existence, triggering auto-creation if enabled [{}]", (Object)topic);
        Span span = TracingHelper.buildServerChildSpan((Tracer)this.tracer, (SpanContext)context, (String)"wait for topic subscription update", (String)CommandConsumerFactory.class.getSimpleName()).start();
        TracingHelper.TAG_TENANT_ID.set(span, tenantId);
        Tags.MESSAGE_BUS_DESTINATION.set(span, topic);
        Promise topicCheckFuture = Promise.promise();
        this.partitionsFor(topic, (Handler<AsyncResult<List<PartitionInfo>>>)topicCheckFuture);
        return topicCheckFuture.future().recover(thr -> {
            LOG.warn("createCommandConsumer: error getting partitions for topic [{}]", (Object)topic, thr);
            return Future.failedFuture((Throwable)new ServerErrorException(503, "error getting topic partitions", thr));
        }).compose(partitions -> {
            if (partitions.isEmpty()) {
                LOG.warn("createCommandConsumer: topic doesn't exist and didn't get auto-created: {}", (Object)topic);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "topic doesn't exist and didn't get auto-created"));
            }
            if (this.subscribedTopics.contains(topic)) {
                return Future.succeededFuture();
            }
            LOG.debug("createCommandConsumer: verified topic existence, wait for subscription update and rebalance [{}]", (Object)topic);
            span.log("verified topic existence, wait for subscription update and rebalance");
            return this.subscribeAndWaitForRebalanceAndTopicsUpdate().compose(v -> {
                if (!this.subscribedTopics.contains(topic)) {
                    LOG.warn("createCommandConsumer: subscription not updated with topic after rebalance [topic: {}]", (Object)topic);
                    return Future.failedFuture((Throwable)new ServerErrorException(503, "subscription not updated with topic after rebalance"));
                }
                LOG.debug("createCommandConsumer: done updating topic subscription");
                return Future.succeededFuture((Object)v);
            });
        }).onComplete(ar -> {
            if (ar.failed()) {
                TracingHelper.logError((Span)span, (Throwable)ar.cause());
            }
            span.finish();
        });
    }

    private KafkaConsumer<String, Buffer> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        this.kafkaConsumer.asStream().partitionsFor(topic, done -> {
            if (done.succeeded()) {
                if (done.result() == null) {
                    handler.handle((Object)Future.succeededFuture(List.of()));
                } else {
                    ArrayList<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
                    for (org.apache.kafka.common.PartitionInfo kafkaPartitionInfo : (List)done.result()) {
                        PartitionInfo partitionInfo = new PartitionInfo();
                        partitionInfo.setInSyncReplicas(Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from((Node)kafkaPartitionInfo.leader())).setPartition(kafkaPartitionInfo.partition()).setReplicas(Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(kafkaPartitionInfo.topic());
                        partitions.add(partitionInfo);
                    }
                    handler.handle((Object)Future.succeededFuture(partitions));
                }
            } else {
                handler.handle((Object)Future.failedFuture((Throwable)done.cause()));
            }
        });
        return this.kafkaConsumer;
    }
}

