/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.KafkaSenderResult;
import io.atleon.util.ConfigLoading;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class AloKafkaSender<K, V> {
    public static final String CONFIG_PREFIX = "kafka.sender.";
    public static final String MAX_IN_FLIGHT_PER_SEND_CONFIG = "kafka.sender.max.in.flight.per.send";
    public static final String STOP_ON_ERROR_CONFIG = "stop.on.error";
    private static final int DEFAULT_MAX_IN_FLIGHT_PER_SEND = 256;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private static final Map<String, AtomicLong> COUNTS_BY_CLIENT_ID = new ConcurrentHashMap<String, AtomicLong>();
    private static final Map<String, Scheduler> SCHEDULERS_BY_CLIENT_ID = new ConcurrentHashMap<String, Scheduler>();
    private final Mono<KafkaSender<K, V>> futureKafkaSender;

    private AloKafkaSender(KafkaConfigSource configSource) {
        this.futureKafkaSender = ((Mono)configSource.create()).map(AloKafkaSender::newSenderOptions).map(KafkaSender::create).cache();
    }

    public static <K, V> AloKafkaSender<K, V> from(KafkaConfigSource configSource) {
        return new AloKafkaSender<K, V>(configSource);
    }

    public static <V> AloKafkaSender<Object, V> forValues(KafkaConfigSource configSource) {
        return new AloKafkaSender(configSource);
    }

    public Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(Publisher<ProducerRecord<K, V>> records) {
        return this.futureKafkaSender.flatMapMany(sender -> this.sendRecords((KafkaSender<K, V>)sender, records));
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(String topic, Function<? super V, ? extends K> valueToKey) {
        return values -> this.sendValues((Publisher<V>)values, topic, valueToKey);
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> values, String topic, Function<? super V, ? extends K> valueToKey) {
        return this.futureKafkaSender.flatMapMany(sender -> this.sendValues((KafkaSender<K, V>)sender, values, value -> topic, valueToKey));
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return values -> this.sendValues((Publisher<V>)values, valueToTopic, valueToKey);
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> values, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return this.futureKafkaSender.flatMapMany(sender -> this.sendValues((KafkaSender<K, V>)sender, values, valueToTopic, valueToKey));
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(String topic, Function<? super V, ? extends K> valueToKey) {
        return aloValues -> this.sendAloValues((Publisher<Alo<V>>)aloValues, topic, valueToKey);
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> aloValues, String topic, Function<? super V, ? extends K> valueToKey) {
        return this.sendAloValues(aloValues, (? super V value) -> topic, valueToKey);
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return aloValues -> this.sendAloValues((Publisher<Alo<V>>)aloValues, valueToTopic, valueToKey);
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> aloValues, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return (AloFlux)this.futureKafkaSender.flatMapMany(sender -> this.sendAloValues((KafkaSender<K, V>)sender, aloValues, valueToTopic, valueToKey)).as(AloFlux::wrap);
    }

    public AloFlux<KafkaSenderResult<ProducerRecord<K, V>>> sendAloRecords(Publisher<Alo<ProducerRecord<K, V>>> aloRecords) {
        return (AloFlux)this.futureKafkaSender.flatMapMany(sender -> this.sendAloRecords((KafkaSender<K, V>)sender, aloRecords)).as(AloFlux::wrap);
    }

    private Flux<KafkaSenderResult<V>> sendValues(KafkaSender<K, V> sender, Publisher<V> values, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return Flux.from(values).map(aloValue -> this.createSenderRecordOfValue(aloValue, valueToTopic, valueToKey)).transform(arg_0 -> sender.send(arg_0)).map(KafkaSenderResult::fromSenderResult);
    }

    private SenderRecord<K, V, V> createSenderRecordOfValue(V value, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        String topic = valueToTopic.apply(value);
        K key = valueToKey.apply(value);
        ProducerRecord producerRecord = new ProducerRecord(topic, null, key, value);
        return SenderRecord.create((ProducerRecord)producerRecord, value);
    }

    private Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(KafkaSender<K, V> sender, Publisher<ProducerRecord<K, V>> records) {
        return Flux.from(records).map(record -> SenderRecord.create((ProducerRecord)record, (Object)record)).transform(arg_0 -> sender.send(arg_0)).map(KafkaSenderResult::fromSenderResult);
    }

    private Flux<Alo<KafkaSenderResult<V>>> sendAloValues(KafkaSender<K, V> sender, Publisher<Alo<V>> aloValues, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return AloFlux.toFlux(aloValues).map(aloValue -> this.createSenderRecordOfAloValue((Alo<V>)aloValue, valueToTopic, valueToKey)).transform(arg_0 -> sender.send(arg_0)).map(KafkaSenderResult::fromSenderResultOfAlo);
    }

    private SenderRecord<K, V, Alo<V>> createSenderRecordOfAloValue(Alo<V> aloValue, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        Object value = aloValue.get();
        String topic = valueToTopic.apply(value);
        K key = valueToKey.apply(value);
        ProducerRecord producerRecord = new ProducerRecord(topic, null, key, value);
        return SenderRecord.create((ProducerRecord)producerRecord, aloValue);
    }

    private Flux<Alo<KafkaSenderResult<ProducerRecord<K, V>>>> sendAloRecords(KafkaSender<K, V> sender, Publisher<Alo<ProducerRecord<K, V>>> aloRecords) {
        return AloFlux.toFlux(aloRecords).map(aloRecord -> SenderRecord.create((ProducerRecord)((ProducerRecord)aloRecord.get()), (Object)aloRecord)).transform(arg_0 -> sender.send(arg_0)).map(KafkaSenderResult::fromSenderResultOfAlo);
    }

    private static <K, V> SenderOptions<K, V> newSenderOptions(Map<String, Object> config) {
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put(MAX_IN_FLIGHT_PER_SEND_CONFIG, config.getOrDefault(MAX_IN_FLIGHT_PER_SEND_CONFIG, 256));
        options.put(STOP_ON_ERROR_CONFIG, config.getOrDefault(STOP_ON_ERROR_CONFIG, false));
        HashMap<String, Object> producerConfig = new HashMap<String, Object>(config);
        options.keySet().forEach(producerConfig::remove);
        String clientId = (String)ConfigLoading.loadOrThrow(config, (String)"client.id", Object::toString);
        producerConfig.put("client.id", clientId + "-" + AloKafkaSender.nextClientIdCount(clientId));
        SenderOptions senderOptions = SenderOptions.create(producerConfig);
        senderOptions.maxInFlight(((Integer)ConfigLoading.loadOrThrow(options, (String)MAX_IN_FLIGHT_PER_SEND_CONFIG, Integer::valueOf)).intValue());
        senderOptions.stopOnError(((Boolean)ConfigLoading.loadOrThrow(options, (String)STOP_ON_ERROR_CONFIG, Boolean::valueOf)).booleanValue());
        senderOptions.scheduler(AloKafkaSender.schedulerForClient(clientId));
        return senderOptions;
    }

    private static long nextClientIdCount(String clientId) {
        return COUNTS_BY_CLIENT_ID.computeIfAbsent(clientId, key -> new AtomicLong()).incrementAndGet();
    }

    private static Scheduler schedulerForClient(String clientId) {
        return SCHEDULERS_BY_CLIENT_ID.computeIfAbsent(clientId, AloKafkaSender::newSchedulerForClient);
    }

    private static Scheduler newSchedulerForClient(String clientId) {
        return Schedulers.newSingle((String)(AloKafkaSender.class.getSimpleName() + "-" + clientId));
    }
}

