/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tag;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.eclipse.hono.application.client.CommandSender;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.impl.KafkaDownstreamMessage;
import org.eclipse.hono.client.SendMessageTimeoutException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedCommandSender
extends AbstractKafkaBasedMessageSender
implements CommandSender<KafkaMessageContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBasedCommandSender.class);
    private static final long DEFAULT_COMMAND_TIMEOUT_IN_MS = 10000L;
    private final Vertx vertx;
    private final KafkaConsumerConfigProperties consumerConfig;
    private final ConcurrentHashMap<String, HonoKafkaConsumer> commandResponseConsumers = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, ExpiringCommandPromise>> pendingCommandResponses = new ConcurrentHashMap();
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private Supplier<String> correlationIdSupplier = () -> UUID.randomUUID().toString();

    public KafkaBasedCommandSender(Vertx vertx, KafkaConsumerConfigProperties consumerConfig, KafkaProducerFactory<String, Buffer> producerFactory, KafkaProducerConfigProperties producerConfig, Tracer tracer) {
        super(producerFactory, "command-sender", producerConfig, tracer);
        this.vertx = Objects.requireNonNull(vertx);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
    }

    public Future<Void> stop() {
        List stopKafkaClientsTracker = this.commandResponseConsumers.values().stream().map(HonoKafkaConsumer::stop).collect(Collectors.toList());
        this.commandResponseConsumers.clear();
        stopKafkaClientsTracker.add(super.stop());
        return CompositeFuture.join(stopKafkaClientsTracker).mapEmpty();
    }

    public Future<Void> sendAsyncCommand(String tenantId, String deviceId, String command, String contentType, Buffer data, String correlationId, String replyId, Map<String, Object> properties, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(command);
        Objects.requireNonNull(correlationId);
        return this.sendCommand(tenantId, deviceId, command, contentType, data, correlationId, properties, true, "send command", context);
    }

    public Future<Void> sendOneWayCommand(String tenantId, String deviceId, String command, String contentType, Buffer data, Map<String, Object> properties, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(command);
        return this.sendCommand(tenantId, deviceId, command, contentType, data, null, properties, false, "send one-way command", context);
    }

    public Future<DownstreamMessage<KafkaMessageContext>> sendCommand(String tenantId, String deviceId, String command, String contentType, Buffer data, String replyId, Map<String, Object> properties, Duration timeout, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(command);
        long timeoutInMs = Optional.ofNullable(timeout).map(t -> {
            if (t.isNegative()) {
                throw new IllegalArgumentException("command timeout duration must be >= 0");
            }
            return t.toMillis();
        }).orElse(10000L);
        String correlationId = this.correlationIdSupplier.get();
        Span span = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)context, (String)"send command and receive response", (String)((Object)((Object)this)).getClass().getSimpleName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenantId).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag((Tag)TracingHelper.TAG_CORRELATION_ID, (Object)correlationId).start();
        ExpiringCommandPromise expiringCommandPromise = new ExpiringCommandPromise(correlationId, timeoutInMs, (Handler<Void>)((Handler)x -> this.removePendingCommandResponse(tenantId, correlationId)), span);
        this.subscribeForCommandResponse(tenantId, span).compose(ok -> {
            this.pendingCommandResponses.computeIfAbsent(tenantId, k -> new ConcurrentHashMap()).put(correlationId, expiringCommandPromise);
            return this.sendCommand(tenantId, deviceId, command, contentType, data, correlationId, properties, true, "send command", span.context()).onSuccess(sent -> {
                LOGGER.debug("sent command [correlation-id: {}], waiting for response", (Object)correlationId);
                span.log("sent command, waiting for response");
            }).onFailure(error -> {
                LOGGER.debug("error sending command", error);
                if (!expiringCommandPromise.future().isComplete()) {
                    TracingHelper.logError((Span)span, (String)"error sending command", (Throwable)error);
                }
                this.removePendingCommandResponse(tenantId, correlationId);
                expiringCommandPromise.tryCompleteAndCancelTimer((AsyncResult<DownstreamMessage<KafkaMessageContext>>)Future.failedFuture((Throwable)error));
            });
        });
        return expiringCommandPromise.future().onComplete(o -> span.finish());
    }

    void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier) {
        this.kafkaConsumerSupplier = Objects.requireNonNull(kafkaConsumerSupplier);
    }

    void setCorrelationIdSupplier(Supplier<String> correlationIdSupplier) {
        this.correlationIdSupplier = Objects.requireNonNull(correlationIdSupplier);
    }

    private Future<Void> sendCommand(String tenantId, String deviceId, String command, String contentType, Buffer data, String correlationId, Map<String, Object> properties, boolean responseRequired, String spanOperationName, SpanContext context) {
        HonoTopic topic = new HonoTopic(HonoTopic.Type.COMMAND, tenantId);
        Map<String, Object> headerProperties = this.getHeaderProperties(deviceId, command, contentType, correlationId, responseRequired, properties);
        return this.sendAndWaitForOutcome(topic.toString(), tenantId, deviceId, data, headerProperties, spanOperationName, context);
    }

    private Map<String, Object> getHeaderProperties(String deviceId, String subject, String contentType, String correlationId, boolean responseRequired, Map<String, Object> properties) {
        Map props = Optional.ofNullable(properties).map(HashMap::new).orElseGet(HashMap::new);
        props.put("device_id", deviceId);
        props.put("subject", subject);
        props.put("content-type", Objects.nonNull(contentType) ? contentType : "application/octet-stream");
        Optional.ofNullable(correlationId).ifPresent(id -> props.put("correlation-id", id));
        props.put("response-required", responseRequired);
        return props;
    }

    private Handler<DownstreamMessage<KafkaMessageContext>> getCommandResponseHandler(String tenantId) {
        return message -> {
            if (message.getCorrelationId() == null) {
                LOGGER.trace("ignoring received command response - no correlation id set [tenant: {}]", (Object)tenantId);
                return;
            }
            this.removePendingCommandResponse(tenantId, message.getCorrelationId()).ifPresentOrElse(expiringCommandPromise -> expiringCommandPromise.tryCompleteAndCancelTimer((AsyncResult<DownstreamMessage<KafkaMessageContext>>)this.mapResponseResult((DownstreamMessage<KafkaMessageContext>)message)), () -> LOGGER.trace("ignoring received command response - no response pending [tenant: {}, correlation-id: {}]", (Object)tenantId, (Object)message.getCorrelationId()));
        };
    }

    private Future<DownstreamMessage<KafkaMessageContext>> mapResponseResult(DownstreamMessage<KafkaMessageContext> message) {
        int status = Optional.ofNullable(message.getStatus()).orElseGet(() -> {
            LOGGER.warn("response message has no status code header [tenant ID: {}, device ID: {}, correlation ID: {}]", new Object[]{message.getTenantId(), message.getDeviceId(), message.getCorrelationId()});
            return 500;
        });
        if (StatusCodeMapper.isSuccessful((Integer)status)) {
            return Future.succeededFuture(message);
        }
        String detailMessage = message.getPayload() != null && message.getPayload().length() > 0 ? message.getPayload().toString(StandardCharsets.UTF_8) : null;
        return Future.failedFuture((Throwable)StatusCodeMapper.from((int)status, detailMessage));
    }

    private Optional<ExpiringCommandPromise> removePendingCommandResponse(String tenantId, String correlationId) {
        return Optional.ofNullable(this.pendingCommandResponses.get(tenantId)).map(ids -> (ExpiringCommandPromise)ids.remove(correlationId));
    }

    private Future<Void> subscribeForCommandResponse(String tenantId, Span span) {
        if (this.commandResponseConsumers.get(tenantId) != null) {
            LOGGER.debug("command response consumer already exists for tenant [{}]", (Object)tenantId);
            span.log("command response consumer already exists");
            return Future.succeededFuture();
        }
        Map consumerConfig = this.consumerConfig.getConsumerConfig(HonoTopic.Type.COMMAND_RESPONSE.toString());
        String autoOffsetResetConfigValue = (String)consumerConfig.get("auto.offset.reset");
        if (autoOffsetResetConfigValue != null && !autoOffsetResetConfigValue.equals("latest")) {
            LOGGER.warn("[auto.offset.reset] value is set to other than [latest]. It will be ignored and internally set to [latest]");
        }
        consumerConfig.put("auto.offset.reset", "latest");
        consumerConfig.put("group.id", tenantId + "-" + UUID.randomUUID());
        String topic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, tenantId).toString();
        Handler recordHandler = record -> this.getCommandResponseHandler(tenantId).handle((Object)new KafkaDownstreamMessage((KafkaConsumerRecord<String, Buffer>)record));
        HonoKafkaConsumer consumer = new HonoKafkaConsumer(this.vertx, Set.of(topic), recordHandler, consumerConfig);
        Optional.ofNullable(this.kafkaConsumerSupplier).ifPresent(arg_0 -> ((HonoKafkaConsumer)consumer).setKafkaConsumerSupplier(arg_0));
        return consumer.start().recover(error -> {
            LOGGER.debug("error creating command response consumer for tenant [{}]", (Object)tenantId, error);
            TracingHelper.logError((Span)span, (String)"error creating command response consumer", (Throwable)error);
            return Future.failedFuture((Throwable)error);
        }).onSuccess(v -> {
            LOGGER.debug("created command response consumer for tenant [{}]", (Object)tenantId);
            span.log("created command response consumer");
            this.commandResponseConsumers.put(tenantId, consumer);
        });
    }

    private class ExpiringCommandPromise {
        private final Promise<DownstreamMessage<KafkaMessageContext>> promise = Promise.promise();
        private final Span span;
        private Long timerId;

        ExpiringCommandPromise(String correlationId, long timeoutInMs, Handler<Void> timeOutHandler, Span span) {
            Objects.requireNonNull(span);
            this.span = span;
            if (timeoutInMs > 0L) {
                this.timerId = KafkaBasedCommandSender.this.vertx.setTimer(timeoutInMs, id -> {
                    SendMessageTimeoutException error = new SendMessageTimeoutException("send command/wait for response timed out after " + timeoutInMs + "ms");
                    this.timerId = null;
                    LOGGER.debug("cancelling sending command [correlation-id: {}] and waiting for response after {} ms", (Object)correlationId, (Object)timeoutInMs);
                    TracingHelper.logError((Span)span, (Throwable)error);
                    this.promise.tryFail((Throwable)error);
                    Optional.ofNullable(timeOutHandler).ifPresent(handler -> handler.handle(null));
                });
            }
        }

        final void tryCompleteAndCancelTimer(AsyncResult<DownstreamMessage<KafkaMessageContext>> commandResponseResult) {
            Objects.requireNonNull(commandResponseResult);
            Optional.ofNullable(this.timerId).ifPresent(arg_0 -> ((Vertx)KafkaBasedCommandSender.this.vertx).cancelTimer(arg_0));
            if (commandResponseResult.succeeded()) {
                String correlationId = Optional.ofNullable((DownstreamMessage)commandResponseResult.result()).map(DownstreamMessage::getCorrelationId).orElse("");
                LOGGER.trace("received command response [correlation-id: {}]", (Object)correlationId);
                this.span.log("received command response");
                this.promise.tryComplete((Object)((DownstreamMessage)commandResponseResult.result()));
            } else {
                this.promise.tryFail(commandResponseResult.cause());
            }
        }

        final Future<DownstreamMessage<KafkaMessageContext>> future() {
            return this.promise.future();
        }
    }
}

