/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.SenderResult;
import io.atleon.kafka.ContextualProducerFactory;
import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.KafkaSenderResult;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.internals.ProducerFactory;

public class AloKafkaSender<K, V>
implements Closeable {
    public static final String CONFIG_PREFIX = "kafka.sender.";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.sender.auto.increment.client.id";
    public static final String MAX_IN_FLIGHT_PER_SEND_CONFIG = "kafka.sender.max.in.flight.per.send";
    public static final String STOP_ON_ERROR_CONFIG = "kafka.sender.stop.on.error";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloKafkaSender.class);
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private static final Map<String, AtomicLong> COUNTS_BY_ID = new ConcurrentHashMap<String, AtomicLong>();
    private final Mono<SendResources<K, V>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    private AloKafkaSender(KafkaConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(client -> this.closeSink.asFlux().next().then(), SendResources::close);
    }

    public static <K, V> AloKafkaSender<K, V> from(KafkaConfigSource configSource) {
        return AloKafkaSender.create(configSource);
    }

    public static <K, V> AloKafkaSender<K, V> create(KafkaConfigSource configSource) {
        return new AloKafkaSender<K, V>(configSource);
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(String topic, Function<? super V, ? extends K> valueToKey) {
        return values -> this.sendValues((Publisher<V>)values, topic, valueToKey);
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> values, String topic, Function<? super V, ? extends K> valueToKey) {
        return this.sendValues(values, (? super V value) -> topic, valueToKey);
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return values -> this.sendValues((Publisher<V>)values, valueToTopic, valueToKey);
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> values, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        Function recordCreator = this.newValueBasedRecordCreator(valueToTopic, valueToKey);
        return this.futureResources.flatMapMany(resources -> resources.send(values, recordCreator));
    }

    public Mono<KafkaSenderResult<ProducerRecord<K, V>>> sendRecord(ProducerRecord<K, V> record) {
        return this.sendRecords((Publisher<ProducerRecord<K, V>>)Flux.just(record)).next();
    }

    public Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(Publisher<ProducerRecord<K, V>> records) {
        return this.futureResources.flatMapMany(resources -> resources.send(records, Function.identity()));
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(String topic, Function<? super V, ? extends K> valueToKey) {
        return aloValues -> this.sendAloValues((Publisher<Alo<V>>)aloValues, topic, valueToKey);
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> aloValues, String topic, Function<? super V, ? extends K> valueToKey) {
        return this.sendAloValues(aloValues, (? super V value) -> topic, valueToKey);
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return aloValues -> this.sendAloValues((Publisher<Alo<V>>)aloValues, valueToTopic, valueToKey);
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> aloValues, Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        Function recordCreator = this.newValueBasedRecordCreator(valueToTopic, valueToKey);
        return ((AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloValues, recordCreator)).as(AloFlux::wrap)).processFailure(SenderResult::isFailure, SenderResult::toError);
    }

    public AloFlux<KafkaSenderResult<ProducerRecord<K, V>>> sendAloRecords(Publisher<Alo<ProducerRecord<K, V>>> aloRecords) {
        return ((AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloRecords, Function.identity())).as(AloFlux::wrap)).processFailure(SenderResult::isFailure, SenderResult::toError);
    }

    public void close(Object reason) {
        LOGGER.info("Closing AloKafkaSender due to reason={}", reason);
        this.close();
    }

    @Override
    public void close() {
        this.closeSink.tryEmitNext((Object)System.currentTimeMillis());
    }

    private Function<V, ProducerRecord<K, V>> newValueBasedRecordCreator(Function<? super V, String> valueToTopic, Function<? super V, ? extends K> valueToKey) {
        return value -> new ProducerRecord((String)valueToTopic.apply((Object)value), null, valueToKey.apply((Object)value), value);
    }

    private static final class SendResources<K, V> {
        private final KafkaSender<K, V> sender;

        private SendResources(KafkaSender<K, V> sender) {
            this.sender = sender;
        }

        public static <K, V> SendResources<K, V> fromConfig(KafkaConfig config) {
            SenderOptions defaultOptions = SenderOptions.create(SendResources.newProducerConfig(config));
            SenderOptions senderOptions = defaultOptions.maxInFlight(config.loadInt(AloKafkaSender.MAX_IN_FLIGHT_PER_SEND_CONFIG).orElse(defaultOptions.maxInFlight()).intValue()).stopOnError(config.loadBoolean(AloKafkaSender.STOP_ON_ERROR_CONFIG).orElse(false).booleanValue());
            return new SendResources<K, V>(KafkaSender.create((ProducerFactory)ContextualProducerFactory.INSTANCE, (SenderOptions)senderOptions));
        }

        public <T> Flux<KafkaSenderResult<T>> send(Publisher<T> publisher, Function<T, ProducerRecord<K, V>> recordCreator) {
            return Flux.from(publisher).map(item -> SenderRecord.create((ProducerRecord)((ProducerRecord)recordCreator.apply(item)), (Object)item)).transform(arg_0 -> this.sender.send(arg_0)).map(KafkaSenderResult::fromSenderResult);
        }

        public <T> Flux<Alo<KafkaSenderResult<T>>> sendAlos(Publisher<Alo<T>> alos, Function<T, ProducerRecord<K, V>> recordCreator) {
            return AloFlux.toFlux(alos).map(alo -> SenderRecord.create((ProducerRecord)((ProducerRecord)recordCreator.apply(alo.get())), (Object)alo)).transform(arg_0 -> this.sender.send(arg_0)).map(KafkaSenderResult::fromSenderResultOfAlo);
        }

        public void close() {
            this.sender.close();
        }

        private static Map<String, Object> newProducerConfig(KafkaConfig config) {
            return config.modifyAndGetProperties(properties -> {
                properties.keySet().removeIf(key -> key.startsWith(AloKafkaSender.CONFIG_PREFIX));
                if (config.loadBoolean(AloKafkaSender.AUTO_INCREMENT_CLIENT_ID_CONFIG).orElse(false).booleanValue()) {
                    properties.computeIfPresent("client.id", (__, id) -> SendResources.incrementId(id.toString()));
                }
            });
        }

        private static String incrementId(String id) {
            return id + "-" + COUNTS_BY_ID.computeIfAbsent(id, __ -> new AtomicLong()).incrementAndGet();
        }
    }
}

