/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.polling;

import io.atleon.core.Alo;
import io.atleon.core.AloComponentExtractor;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloQueueingTransformer;
import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import io.atleon.polling.reactive.PollerOptions;
import io.atleon.polling.reactive.PollingReceiver;
import io.atleon.polling.reactive.PollingSourceConfig;
import io.atleon.polling.reactive.ReceiverRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class AloPollingReceiver<P, O> {
    private final Pollable<P, O> pollable;
    private final PollingSourceConfig config;
    private final Mono<ReceiveResources<P, O>> resourcesMono;

    private AloPollingReceiver(Pollable<P, O> pollable, PollingSourceConfig config) {
        this.pollable = pollable;
        this.config = config;
        this.resourcesMono = Mono.just(ReceiveResources.create(AloFactoryConfig.loadDefault(), config.getNackStrategy()));
    }

    public static <P, O> AloPollingReceiver<P, O> from(Pollable<P, O> pollable, PollingSourceConfig config) {
        return AloPollingReceiver.create(pollable, config);
    }

    public static <P, O> AloPollingReceiver<P, O> create(Pollable<P, O> pollable, PollingSourceConfig config) {
        return new AloPollingReceiver<P, O>(pollable, config);
    }

    public AloFlux<P> receivePayloads() {
        return ((AloFlux)this.resourcesMono.flatMapMany(r -> r.receive(PollingReceiver.create(this.pollable, this.buildPollerOptions()))).as(AloFlux::wrap)).map(Polled::getPayload);
    }

    private PollerOptions buildPollerOptions() {
        return PollerOptions.create(this.config.getPollingInterval(), () -> Schedulers.newSingle((String)AloPollingReceiver.class.getSimpleName()));
    }

    public static enum NackStrategy {
        EMIT(true, false),
        NACK(false, true),
        NACK_EMIT(true, true);

        private final boolean emit;
        private final boolean nack;

        private NackStrategy(boolean emit, boolean nack) {
            this.emit = emit;
            this.nack = nack;
        }

        public boolean isEmit() {
            return this.emit;
        }

        public boolean isNack() {
            return this.nack;
        }
    }

    private static final class ReceiveResources<P, O> {
        private final AloFactory<Polled<P, O>> aloFactory;
        private final NackStrategy nackStrategy;

        private ReceiveResources(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            this.aloFactory = aloFactory;
            this.nackStrategy = nackStrategy;
        }

        static <P, O> ReceiveResources<P, O> create(AloFactory<Polled<P, O>> aloFactory, NackStrategy nackStrategy) {
            return new ReceiveResources<P, O>(aloFactory, nackStrategy);
        }

        public Flux<Alo<Polled<P, O>>> receive(PollingReceiver<P, O> receiver) {
            Sinks.Empty sink = Sinks.empty();
            return receiver.receive().transform(this.newAloQueueingTransformer(sink)).mergeWith((Publisher)sink.asMono());
        }

        private AloQueueingTransformer<ReceiverRecord<P, O>, Polled<P, O>> newAloQueueingTransformer(Sinks.Empty<?> sink) {
            AloComponentExtractor componentExtractor = AloComponentExtractor.composed(record -> () -> this.ack((ReceiverRecord<P, O>)record), record -> error -> this.nack(sink, (Throwable)error, (ReceiverRecord<P, O>)record), ReceiverRecord::getRecord);
            return AloQueueingTransformer.create((AloComponentExtractor)componentExtractor).withGroupExtractor(receiverRecord -> receiverRecord.getRecord().getGroup());
        }

        private void ack(ReceiverRecord<P, O> record) {
            record.getPollable().ack(record.getRecord().getOffset());
        }

        private void nack(Sinks.Empty<?> sink, Throwable throwable, ReceiverRecord<P, O> record) {
            if (this.nackStrategy.nack) {
                record.getPollable().nack(throwable, record.getRecord().getOffset());
            }
            if (this.nackStrategy.emit) {
                sink.tryEmitError(throwable);
            }
        }
    }
}

