/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.TopicExistsException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedInternalCommandConsumer
implements InternalCommandConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedInternalCommandConsumer.class);
    private static final int NUM_PARTITIONS = 1;
    private static final long CREATE_TOPIC_RETRY_INTERVAL = 1000L;
    private final Vertx vertx;
    private final Supplier<Future<AsyncHandlingAutoCommitKafkaConsumer<Buffer>>> consumerCreator;
    private final Supplier<Future<Admin>> kafkaAdminClientCreator;
    private final String adapterInstanceId;
    private final Duration pollTimeout;
    private final CommandHandlers commandHandlers;
    private final Tracer tracer;
    private final CommandResponseSender commandResponseSender;
    private final TenantClient tenantClient;
    private final AtomicBoolean retryCreateTopic = new AtomicBoolean(true);
    private final Map<String, Map<Integer, Long>> lastHandledPartitionOffsetsPerTenant = new HashMap<String, Map<Integer, Long>>();
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private AsyncHandlingAutoCommitKafkaConsumer<Buffer> consumer;
    private Admin adminClient;
    private Context context;
    private KafkaClientMetricsSupport metricsSupport;
    private Long retryCreateTopicTimerId;

    public KafkaBasedInternalCommandConsumer(Vertx vertx, KafkaAdminClientConfigProperties adminClientConfigProperties, MessagingKafkaConsumerConfigProperties consumerConfigProperties, TenantClient tenantClient, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        this.vertx = Objects.requireNonNull(vertx);
        Objects.requireNonNull(adminClientConfigProperties);
        Objects.requireNonNull(consumerConfigProperties);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        this.pollTimeout = Duration.ofMillis(consumerConfigProperties.getPollTimeout());
        Map adminClientConfig = adminClientConfigProperties.getAdminClientConfig("internal-cmd-admin");
        String bootstrapServersConfig = (String)adminClientConfig.get("bootstrap.servers");
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(vertx);
        this.kafkaAdminClientCreator = () -> kafkaClientFactory.createClientWithRetries(() -> Admin.create(new HashMap(adminClientConfig)), () -> ((LifecycleStatus)this.lifecycleStatus).isStarting(), bootstrapServersConfig, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        Map consumerConfig = consumerConfigProperties.getConsumerConfig("internal-cmd");
        this.setFixedConsumerConfigValues(consumerConfig, adapterInstanceId);
        this.consumerCreator = () -> kafkaClientFactory.createClientWithRetries(() -> new AsyncHandlingAutoCommitKafkaConsumer(vertx, Set.of(this.getTopicName()), this::handleCommandMessage, consumerConfig), () -> ((LifecycleStatus)this.lifecycleStatus).isStarting(), (String)consumerConfig.get("bootstrap.servers"), KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
    }

    KafkaBasedInternalCommandConsumer(Context context, Admin kafkaAdminClient, Consumer<String, Buffer> kafkaConsumer, TenantClient tenantClient, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        this.context = Objects.requireNonNull(context);
        Objects.requireNonNull(kafkaAdminClient);
        Objects.requireNonNull(kafkaConsumer);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        this.vertx = context.owner();
        this.pollTimeout = Duration.ofMillis(250L);
        HashMap<String, String> consumerConfig = new HashMap<String, String>();
        this.setFixedConsumerConfigValues(consumerConfig, adapterInstanceId);
        this.consumerCreator = () -> {
            AsyncHandlingAutoCommitKafkaConsumer result = new AsyncHandlingAutoCommitKafkaConsumer(this.vertx, Set.of(this.getTopicName()), this::handleCommandMessage, (Map)consumerConfig);
            result.setKafkaConsumerSupplier(() -> kafkaConsumer);
            return Future.succeededFuture((Object)result);
        };
        this.kafkaAdminClientCreator = () -> Future.succeededFuture((Object)kafkaAdminClient);
    }

    private void setFixedConsumerConfigValues(Map<String, String> consumerConfig, String adapterInstanceId) {
        consumerConfig.put("group.id", adapterInstanceId);
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
    }

    public final KafkaBasedInternalCommandConsumer setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
        return this;
    }

    public final void addOnKafkaConsumerReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("consumer is already started/stopping"));
        }
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture((Throwable)new IllegalStateException("Consumer must be started in a Vert.x context"));
            }
        }
        this.kafkaAdminClientCreator.get().onFailure(thr -> LOG.error("admin client creation failed", thr)).compose(client -> {
            this.adminClient = client;
            return this.createTopic();
        }).recover(e -> this.retryCreateTopic()).compose(v -> this.consumerCreator.get().onFailure(thr -> LOG.error("consumer creation failed", thr))).compose(createdConsumer -> {
            this.consumer = createdConsumer;
            this.consumer.addOnKafkaConsumerReadyHandler(ar -> this.lifecycleStatus.setStarted());
            this.consumer.setPollTimeout(this.pollTimeout);
            this.consumer.setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
            this.consumer.setMetricsSupport(this.metricsSupport);
            return this.consumer.start();
        });
        return Future.succeededFuture();
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        LOG.trace("registering readiness check using kafka based internal command consumer [adapter instance id: {}]", (Object)this.adapterInstanceId);
        readinessHandler.register("internal-command-consumer[%s]-readiness".formatted(this.adapterInstanceId), status -> {
            if (this.lifecycleStatus.isStarted()) {
                status.tryComplete((Object)Status.OK());
            } else {
                JsonObject data = new JsonObject();
                if (this.lifecycleStatus.isStarting()) {
                    if (this.adminClient == null) {
                        LOG.debug("readiness check failed, admin client not created yet (Kafka server URL possibly not resolvable (yet))");
                        data.put("status", (Object)"admin client not created yet (Kafka server URL possibly not resolvable (yet))");
                    } else if (this.retryCreateTopicTimerId != null) {
                        LOG.debug("readiness check failed, internal command topic not created yet");
                        data.put("status", (Object)"internal command topic not created yet");
                    } else if (this.consumer != null) {
                        LOG.debug("readiness check failed, consumer not ready yet");
                        data.put("status", (Object)"consumer not ready yet");
                    } else {
                        LOG.debug("readiness check failed");
                    }
                }
                status.tryComplete((Object)Status.KO((JsonObject)data));
            }
        });
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    private Future<Void> createTopic() {
        Promise promise = Promise.promise();
        String topicName = this.getTopicName();
        NewTopic newTopic = new NewTopic(topicName, Optional.of(1), Optional.empty());
        this.adminClient.createTopics(List.of(newTopic)).all().whenComplete((v, ex) -> this.context.runOnContext(v1 -> Optional.ofNullable(ex).filter(e -> !(e instanceof TopicExistsException)).ifPresentOrElse(arg_0 -> ((Promise)promise).fail(arg_0), () -> ((Promise)promise).complete())));
        return promise.future().onSuccess(v -> LOG.debug("created topic [{}]", (Object)topicName)).onFailure(thr -> LOG.error("error creating topic [{}]", (Object)topicName, thr));
    }

    private Future<Void> retryCreateTopic() {
        Promise createTopicRetryPromise = Promise.promise();
        this.retryCreateTopicTimerId = this.vertx.setPeriodic(1000L, id -> {
            if (this.retryCreateTopic.compareAndSet(true, false)) {
                this.createTopic().onSuccess(ok -> {
                    this.retryCreateTopicTimerId = null;
                    this.vertx.cancelTimer(id.longValue());
                    createTopicRetryPromise.complete();
                }).onFailure(e -> this.retryCreateTopic.set(true));
            }
        });
        return createTopicRetryPromise.future();
    }

    private String getTopicName() {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, this.adapterInstanceId).toString();
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> {
            this.retryCreateTopic.set(false);
            Optional.ofNullable(this.retryCreateTopicTimerId).ifPresent(arg_0 -> ((Vertx)this.vertx).cancelTimer(arg_0));
            return CompositeFuture.all(this.closeAdminClient(), this.stopConsumer()).mapEmpty();
        });
    }

    private Future<Void> closeAdminClient() {
        if (this.adminClient == null) {
            return Future.succeededFuture();
        }
        Promise adminClientClosePromise = Promise.promise();
        LOG.debug("stop: close admin client");
        this.context.executeBlocking(future -> {
            this.adminClient.close();
            LOG.debug("admin client closed");
            future.complete();
        }, (Handler)adminClientClosePromise);
        return adminClientClosePromise.future();
    }

    private Future<Void> stopConsumer() {
        return Optional.ofNullable(this.consumer).map(AsyncHandlingAutoCommitKafkaConsumer::stop).orElseGet(Future::succeededFuture);
    }

    Future<Void> handleCommandMessage(KafkaConsumerRecord<String, Buffer> record) {
        KafkaBasedCommand command;
        Integer commandPartition = KafkaRecordHelper.getOriginalPartitionHeader((List)record.headers()).orElse(null);
        Long commandOffset = KafkaRecordHelper.getOriginalOffsetHeader((List)record.headers()).orElse(null);
        if (commandPartition == null || commandOffset == null) {
            LOG.warn("command record is invalid - missing required original partition/offset headers");
            return Future.failedFuture((String)"command record is invalid");
        }
        try {
            command = KafkaBasedCommand.fromRoutedCommandRecord(record);
        }
        catch (IllegalArgumentException e) {
            LOG.warn("command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{KafkaRecordHelper.getTenantId((List)record.headers()).orElse(null), KafkaRecordHelper.getDeviceId((List)record.headers()).orElse(null), e});
            return Future.failedFuture((String)"command record is invalid");
        }
        Map lastHandledPartitionOffsets = this.lastHandledPartitionOffsetsPerTenant.computeIfAbsent(command.getTenant(), k -> new HashMap());
        Long lastHandledOffset = (Long)lastHandledPartitionOffsets.get(commandPartition);
        if (lastHandledOffset != null && commandOffset <= lastHandledOffset) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("ignoring command - record partition offset {} <= last handled offset {} [{}]", new Object[]{commandOffset, lastHandledOffset, command});
            }
            return Future.succeededFuture();
        }
        lastHandledPartitionOffsets.put(commandPartition, commandOffset);
        CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(command.getTenant(), command.getGatewayOrDeviceId());
        if (commandHandler != null && commandHandler.getGatewayId() != null) {
            command.setGatewayId(commandHandler.getGatewayId());
        }
        SpanContext spanContext = KafkaTracingHelper.extractSpanContext((Tracer)this.tracer, record);
        SpanContext followsFromSpanContext = commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null;
        Span currentSpan = CommandContext.createSpan((Tracer)this.tracer, (Command)command, (SpanContext)spanContext, (SpanContext)followsFromSpanContext, (String)this.getClass().getSimpleName());
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(currentSpan, this.adapterInstanceId);
        KafkaTracingHelper.TAG_OFFSET.set(currentSpan, Long.valueOf(record.offset()));
        KafkaBasedCommandContext commandContext = new KafkaBasedCommandContext(command, this.commandResponseSender, currentSpan);
        return this.tenantClient.get(command.getTenant(), spanContext).recover(t -> {
            TenantDisabledOrNotRegisteredException mappedException;
            if (ServiceInvocationException.extractStatusCode((Throwable)t) == 404) {
                mappedException = new TenantDisabledOrNotRegisteredException(command.getTenant(), 404);
                commandContext.reject((Throwable)mappedException);
            } else {
                mappedException = new ServerErrorException(command.getTenant(), 503, "error retrieving tenant configuration", t);
                commandContext.release((Throwable)mappedException);
            }
            return Future.failedFuture((Throwable)mappedException);
        }).compose(tenantConfig -> {
            commandContext.put("tenant-config", tenantConfig);
            if (commandHandler != null) {
                LOG.trace("using [{}] for received command [{}]", (Object)commandHandler, (Object)command);
                return commandHandler.handleCommand((CommandContext)commandContext);
            }
            LOG.info("no command handler found for command [{}]", (Object)command);
            NoConsumerException exception = new NoConsumerException("no command handler found for command");
            commandContext.release((Throwable)exception);
            return Future.failedFuture((Throwable)exception);
        });
    }
}

