/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedInternalCommandConsumer
implements Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedInternalCommandConsumer.class);
    private static final int NUM_PARTITIONS = 1;
    private static final String CLIENT_NAME = "internal-cmd";
    private final Supplier<KafkaConsumer<String, Buffer>> consumerCreator;
    private final String adapterInstanceId;
    private final String clientId;
    private final CommandHandlers commandHandlers;
    private final Tracer tracer;
    private final CommandResponseSender commandResponseSender;
    private final Admin adminClient;
    private final Map<String, Map<Integer, Long>> lastHandledPartitionOffsetsPerTenant = new HashMap<String, Map<Integer, Long>>();
    private KafkaConsumer<String, Buffer> consumer;
    private Context context;
    private KafkaClientMetricsSupport metricsSupport;

    public KafkaBasedInternalCommandConsumer(Vertx vertx, KafkaAdminClientConfigProperties adminClientConfigProperties, KafkaConsumerConfigProperties consumerConfigProperties, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(adminClientConfigProperties);
        Objects.requireNonNull(consumerConfigProperties);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        Map adminClientConfig = adminClientConfigProperties.getAdminClientConfig(CLIENT_NAME);
        this.adminClient = Admin.create(new HashMap(adminClientConfig));
        Map consumerConfig = consumerConfigProperties.getConsumerConfig(CLIENT_NAME);
        consumerConfig.put("group.id", adapterInstanceId);
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        this.clientId = (String)consumerConfig.get("client.id");
        this.consumerCreator = () -> KafkaConsumer.create((Vertx)vertx, (Map)consumerConfig, String.class, Buffer.class);
    }

    KafkaBasedInternalCommandConsumer(Context context, Admin kafkaAdminClient, KafkaConsumer<String, Buffer> kafkaConsumer, String clientId, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        this.context = Objects.requireNonNull(context);
        this.adminClient = Objects.requireNonNull(kafkaAdminClient);
        this.consumer = Objects.requireNonNull(kafkaConsumer);
        this.clientId = Objects.requireNonNull(clientId);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        this.consumerCreator = () -> kafkaConsumer;
    }

    public final KafkaBasedInternalCommandConsumer setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
        return this;
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture((Throwable)new IllegalStateException("Consumer must be started in a Vert.x context"));
            }
        }
        this.consumer = this.consumerCreator.get();
        Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(this.consumer.unwrap()));
        return this.createTopic().compose(v -> this.subscribeToTopic());
    }

    private Future<Void> createTopic() {
        Promise promise = Promise.promise();
        String topicName = this.getTopicName();
        NewTopic newTopic = new NewTopic(topicName, Optional.of(1), Optional.empty());
        this.adminClient.createTopics(List.of(newTopic)).all().whenComplete((v, ex) -> this.context.runOnContext(v1 -> Optional.ofNullable(ex).ifPresentOrElse(arg_0 -> ((Promise)promise).fail(arg_0), () -> ((Promise)promise).complete())));
        return promise.future().onSuccess(v -> LOG.debug("created topic [{}]", (Object)topicName)).onFailure(thr -> LOG.error("error creating topic [{}]", (Object)topicName, thr));
    }

    private Future<Void> subscribeToTopic() {
        this.consumer.handler(this::handleCommandMessage);
        this.consumer.exceptionHandler(thr -> LOG.error("consumer error occurred [adapterInstanceId: {}, clientId: {}]", new Object[]{this.adapterInstanceId, this.clientId, thr}));
        this.consumer.partitionsRevokedHandler(this::onPartitionsRevoked);
        Promise partitionAssignedPromise = Promise.promise();
        this.consumer.partitionsAssignedHandler(partitionsSet -> {
            LOG.debug("partitions assigned: {}", partitionsSet);
            partitionAssignedPromise.tryComplete();
        });
        String topicName = this.getTopicName();
        Promise subscribedPromise = Promise.promise();
        this.consumer.subscribe(topicName, (Handler)subscribedPromise);
        return CompositeFuture.all((Future)subscribedPromise.future(), (Future)partitionAssignedPromise.future()).map((Object)null).onComplete(ar -> this.consumer.partitionsAssignedHandler(this::onPartitionsAssigned)).onSuccess(v -> LOG.debug("subscribed and got partition assignment for topic [{}]", (Object)topicName)).onFailure(thr -> LOG.error("error subscribing to topic [{}]", (Object)topicName, thr));
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        LOG.debug("partitions assigned: {}", partitionsSet);
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        LOG.debug("partitions revoked: {}", partitionsSet);
    }

    private String getTopicName() {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, this.adapterInstanceId).toString();
    }

    public Future<Void> stop() {
        if (this.consumer == null) {
            return Future.failedFuture((String)"not started");
        }
        String topicName = this.getTopicName();
        Promise adminClientClosePromise = Promise.promise();
        LOG.debug("stop: delete topic [{}]", (Object)topicName);
        this.adminClient.deleteTopics(List.of(topicName)).all().whenComplete((v, ex) -> {
            if (ex != null) {
                LOG.warn("error deleting topic [{}]", (Object)topicName, ex);
            }
            this.context.executeBlocking(future -> {
                this.adminClient.close();
                future.complete();
            }, (Handler)adminClientClosePromise);
        });
        adminClientClosePromise.future().onComplete(ar -> LOG.debug("admin client closed"));
        Promise consumerClosePromise = Promise.promise();
        LOG.debug("stop: close consumer");
        this.consumer.close((Handler)consumerClosePromise);
        consumerClosePromise.future().onComplete(ar -> {
            LOG.debug("consumer closed");
            Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.consumer.unwrap()));
        });
        return CompositeFuture.all((Future)adminClientClosePromise.future(), (Future)consumerClosePromise.future()).mapEmpty();
    }

    void handleCommandMessage(KafkaConsumerRecord<String, Buffer> record) {
        KafkaBasedCommand command;
        Integer commandPartition = KafkaRecordHelper.getHeaderValue((List)record.headers(), (String)"orig-partition", Integer.class).orElse(null);
        Long commandOffset = KafkaRecordHelper.getHeaderValue((List)record.headers(), (String)"orig-offset", Long.class).orElse(null);
        if (commandPartition == null || commandOffset == null) {
            LOG.warn("command record is invalid - missing required original partition/offset headers");
            return;
        }
        try {
            command = KafkaBasedCommand.fromRoutedCommandRecord(record);
        }
        catch (IllegalArgumentException e) {
            LOG.warn("command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{KafkaRecordHelper.getHeaderValue((List)record.headers(), (String)"tenant_id", String.class).orElse(""), KafkaRecordHelper.getHeaderValue((List)record.headers(), (String)"device_id", String.class).orElse(""), e});
            return;
        }
        Map lastHandledPartitionOffsets = this.lastHandledPartitionOffsetsPerTenant.computeIfAbsent(command.getTenant(), k -> new HashMap());
        Long lastHandledOffset = (Long)lastHandledPartitionOffsets.get(commandPartition);
        if (lastHandledOffset != null && commandOffset <= lastHandledOffset) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("ignoring command - record partition offset {} <= last handled offset {} [{}]", new Object[]{commandOffset, lastHandledOffset, command});
            }
        } else {
            lastHandledPartitionOffsets.put(commandPartition, commandOffset);
            CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(command.getTenant(), command.getGatewayOrDeviceId());
            if (commandHandler != null && commandHandler.getGatewayId() != null) {
                command.setGatewayId(commandHandler.getGatewayId());
            }
            SpanContext spanContext = KafkaTracingHelper.extractSpanContext((Tracer)this.tracer, record);
            SpanContext followsFromSpanContext = commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null;
            Span currentSpan = CommandContext.createSpan((Tracer)this.tracer, (Command)command, (SpanContext)spanContext, (SpanContext)followsFromSpanContext);
            currentSpan.setTag("adapter_instance_id", this.adapterInstanceId);
            KafkaTracingHelper.TAG_OFFSET.set(currentSpan, Long.valueOf(record.offset()));
            KafkaBasedCommandContext commandContext = new KafkaBasedCommandContext(command, currentSpan, this.commandResponseSender);
            if (commandHandler != null) {
                LOG.trace("using [{}] for received command [{}]", (Object)commandHandler, (Object)command);
                commandHandler.handleCommand((CommandContext)commandContext);
            } else {
                LOG.info("no command handler found for command [{}]", (Object)command);
                commandContext.release((Throwable)new NoConsumerException("no command handler found for command"));
            }
        }
    }
}

