/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.polling;

import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.OrderManagingAcknowledgementOperator;
import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import io.atleon.polling.reactive.PollerOptions;
import io.atleon.polling.reactive.PollingReceiver;
import io.atleon.polling.reactive.PollingSourceConfig;
import io.atleon.polling.reactive.ReceiverRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class AloPollingReceiver<P, O> {
    private final Pollable<P, O> pollable;
    private final PollingSourceConfig config;
    private final Mono<ReceiveResources<P, O>> resourcesMono;

    private AloPollingReceiver(Pollable<P, O> pollable, PollingSourceConfig config) {
        this.pollable = pollable;
        this.config = config;
        this.resourcesMono = Mono.just(ReceiveResources.create(AloFactoryConfig.loadDefault(), config.getNackStrategy()));
    }

    public static <P, O> AloPollingReceiver<P, O> from(Pollable<P, O> pollable, PollingSourceConfig config) {
        return new AloPollingReceiver<P, O>(pollable, config);
    }

    public AloFlux<P> receivePayloads() {
        return ((AloFlux)this.resourcesMono.flatMapMany(r -> r.receive(PollingReceiver.create(this.pollable, this.buildPollerOptions()))).as(AloFlux::wrap)).map(Polled::getPayload);
    }

    private PollerOptions buildPollerOptions() {
        return PollerOptions.create(this.config.getPollingInterval(), () -> Schedulers.newSingle((String)AloPollingReceiver.class.getSimpleName()));
    }

    private static final class ReceiveResources<P, O> {
        private final AloFactory<Polled<P, O>> aloFactory;
        private final NackStrategy nackStrategy;

        private ReceiveResources(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            this.aloFactory = aloFactory;
            this.nackStrategy = nackStrategy;
        }

        static <P, O> ReceiveResources<P, O> create(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            return new ReceiveResources<P, O>(aloFactory, nackStrategy);
        }

        public Flux<Alo<Polled<P, O>>> receive(PollingReceiver<P, O> receiver) {
            return receiver.receive().transform(this::createAloRecords);
        }

        private Flux<Alo<Polled<P, O>>> createAloRecords(Flux<ReceiverRecord<P, O>> records) {
            Sinks.Empty sink = Sinks.empty();
            return records.map(record -> this.aloFactory.create(record.getRecord(), () -> this.ack((ReceiverRecord<P, O>)record), t -> this.nack((Sinks.Empty<Alo<Polled<P, O>>>)sink, (Throwable)t, (ReceiverRecord<P, O>)record))).mergeWith((Publisher)sink.asMono()).transform(aloRecords -> new OrderManagingAcknowledgementOperator((Publisher)aloRecords, Polled::getGroup));
        }

        private void ack(ReceiverRecord<P, O> record) {
            record.getPollable().ack(record.getRecord().getOffset());
        }

        private void nack(Sinks.Empty<Alo<Polled<P, O>>> sink, Throwable throwable, ReceiverRecord<P, O> record) {
            if (this.nackStrategy.nack) {
                record.getPollable().nack(throwable, record.getRecord().getOffset());
            }
            if (this.nackStrategy.emit) {
                sink.tryEmitError(throwable);
            }
        }
    }

    public static enum NackStrategy {
        EMIT(true, false),
        NACK(false, true),
        NACK_EMIT(true, true);

        private final boolean emit;
        private final boolean nack;

        private NackStrategy(boolean emit, boolean nack) {
            this.emit = emit;
            this.nack = nack;
        }

        public boolean isEmit() {
            return this.emit;
        }

        public boolean isNack() {
            return this.nack;
        }
    }
}

