/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaApplicationClient;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.impl.KafkaBasedCommandSender;
import org.eclipse.hono.application.client.kafka.impl.KafkaBasedDownstreamMessageConsumer;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

public class KafkaApplicationClientImpl
extends KafkaBasedCommandSender
implements KafkaApplicationClient {
    private final Vertx vertx;
    private final KafkaConsumerConfigProperties consumerConfig;
    private Supplier<KafkaConsumer<String, Buffer>> kafkaConsumerSupplier;

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties consumerConfig, KafkaProducerFactory<String, Buffer> producerFactory, KafkaProducerConfigProperties producerConfig) {
        this(vertx, consumerConfig, producerFactory, producerConfig, (Tracer)NoopTracerFactory.create());
    }

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties consumerConfig, KafkaProducerFactory<String, Buffer> producerFactory, KafkaProducerConfigProperties producerConfig, Tracer tracer) {
        super(producerFactory, producerConfig, tracer);
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(consumerConfig);
        if (!consumerConfig.isConfigured() || !producerConfig.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = consumerConfig;
    }

    @Override
    public Future<MessageConsumer> createTelemetryConsumer(String tenantId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.TELEMETRY, messageHandler, closeHandler);
    }

    @Override
    public Future<MessageConsumer> createEventConsumer(String tenantId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.EVENT, messageHandler, closeHandler);
    }

    public Future<MessageConsumer> createCommandResponseConsumer(String tenantId, String replyId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.COMMAND_RESPONSE, messageHandler, closeHandler);
    }

    void setKafkaConsumerFactory(Supplier<KafkaConsumer<String, Buffer>> kafkaConsumerSupplier) {
        Objects.requireNonNull(kafkaConsumerSupplier);
        this.kafkaConsumerSupplier = kafkaConsumerSupplier;
    }

    private Future<MessageConsumer> createKafkaBasedDownstreamMessageConsumer(String tenantId, HonoTopic.Type type, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(type);
        Objects.requireNonNull(messageHandler);
        KafkaConsumer kafkaConsumer = Optional.ofNullable(this.kafkaConsumerSupplier).map(Supplier::get).orElseGet(() -> KafkaConsumer.create((Vertx)this.vertx, (Map)this.consumerConfig.getConsumerConfig(type.toString())));
        Handler effectiveCloseHandler = Objects.nonNull(closeHandler) ? closeHandler : t -> {};
        return KafkaBasedDownstreamMessageConsumer.create(tenantId, type, (KafkaConsumer<String, Buffer>)kafkaConsumer, this.consumerConfig, messageHandler, (Handler<Throwable>)effectiveCloseHandler);
    }
}

