/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.producer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingKafkaProducerFactory<K, V>
implements KafkaProducerFactory<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(CachingKafkaProducerFactory.class);
    private final Map<String, KafkaProducer<K, V>> activeProducers = new ConcurrentHashMap<String, KafkaProducer<K, V>>();
    private final KafkaClientFactory kafkaClientFactory;
    private final BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier;
    private KafkaClientMetricsSupport metricsSupport;

    private CachingKafkaProducerFactory(Vertx vertx, BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier) {
        this.producerInstanceSupplier = producerInstanceSupplier;
        this.kafkaClientFactory = new KafkaClientFactory(vertx);
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> sharedFactory(Vertx vertx) {
        return new CachingKafkaProducerFactory<K, V>(vertx, (name, config) -> KafkaProducer.createShared((Vertx)vertx, (String)name, (Map)config));
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> nonSharedFactory(Vertx vertx) {
        return new CachingKafkaProducerFactory<K, V>(vertx, (name, config) -> KafkaProducer.create((Vertx)vertx, (Map)config));
    }

    public static <K, V> CachingKafkaProducerFactory<K, V> testFactory(Vertx vertx, BiFunction<String, Map<String, String>, KafkaProducer<K, V>> producerInstanceSupplier) {
        return new CachingKafkaProducerFactory<K, V>(vertx, producerInstanceSupplier);
    }

    @Override
    public final void setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
    }

    @Override
    public KafkaProducer<K, V> getOrCreateProducer(String producerName, KafkaProducerConfigProperties config) {
        AtomicReference createdProducer = new AtomicReference();
        KafkaProducer kafkaProducer = this.activeProducers.computeIfAbsent(producerName, name -> {
            Map<String, String> producerConfig = config.getProducerConfig(producerName);
            String clientId = producerConfig.get("client.id");
            KafkaProducer<K, V> producer = this.producerInstanceSupplier.apply(producerName, producerConfig);
            createdProducer.set(producer);
            return producer.exceptionHandler(this.getExceptionHandler(producerName, producer, clientId));
        });
        if (this.metricsSupport != null && kafkaProducer == createdProducer.get()) {
            this.metricsSupport.registerKafkaProducer(kafkaProducer.unwrap());
        }
        return kafkaProducer;
    }

    @Override
    public Future<KafkaProducer<K, V>> getOrCreateProducerWithRetries(String producerName, KafkaProducerConfigProperties config, Duration retriesTimeout) {
        String bootstrapServersConfig = config.getBootstrapServers();
        return this.kafkaClientFactory.createClientWithRetries(() -> this.getOrCreateProducer(producerName, config), bootstrapServersConfig, retriesTimeout);
    }

    private Handler<Throwable> getExceptionHandler(String producerName, KafkaProducer<K, V> producer, String clientId) {
        return t -> {
            if (CachingKafkaProducerFactory.isFatalError(t)) {
                LOG.error("fatal producer error occurred, closing producer [clientId: {}]", (Object)clientId, t);
                this.activeProducers.remove(producerName);
                producer.close().onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaProducer(producer.unwrap())));
            } else {
                LOG.error("producer error occurred [clientId: {}]", (Object)clientId, t);
            }
        };
    }

    @Override
    public Optional<KafkaProducer<K, V>> getProducer(String producerName) {
        return Optional.ofNullable(this.activeProducers.get(producerName));
    }

    @Override
    public Future<Void> closeProducer(String producerName) {
        KafkaProducer producer = this.activeProducers.remove(producerName);
        if (producer == null) {
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        producer.close((Handler)promise);
        return promise.future().onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaProducer(producer.unwrap())));
    }

    public static boolean isFatalError(Throwable error) {
        return error instanceof ProducerFencedException || error instanceof OutOfOrderSequenceException || error instanceof AuthorizationException || error instanceof UnsupportedVersionException || error instanceof UnsupportedForMessageFormatException;
    }
}

