/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import io.atleon.polling.reactive.Poller;
import io.atleon.polling.reactive.PollerOptions;
import io.atleon.polling.reactive.PollingReceiver;
import io.atleon.polling.reactive.ReceiverRecord;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class PollingReceiverImp<P, O>
implements PollingReceiver<P, O> {
    private final Pollable<P, O> pollable;
    private final PollerOptions pollerOptions;
    private Poller<P, O> poller;

    protected PollingReceiverImp(Pollable<P, O> pollable, PollerOptions pollerOptions) {
        this.pollable = pollable;
        this.pollerOptions = pollerOptions;
    }

    @Override
    public Flux<ReceiverRecord<P, O>> receive() {
        return this.withPoller((scheduler, handler) -> handler.receive().publishOn(scheduler, 1).flatMapIterable(it -> it).map(r -> new ReceiverRecord(r, handler.getPollable())));
    }

    private <T> Flux<T> withPoller(BiFunction<Scheduler, Poller<P, O>, Flux<T>> function) {
        return Flux.usingWhen((Publisher)Mono.fromCallable(() -> {
            this.poller = Poller.create(this.pollable, this.pollerOptions.getPollingInterval());
            return this.poller;
        }), poller -> Flux.using(() -> Schedulers.single((Scheduler)this.pollerOptions.getSchedulerSupplier().get()), scheduler -> (Publisher)function.apply((Scheduler)scheduler, (Poller)poller), Scheduler::dispose), p -> p.close().doFinally(s -> {
            this.poller = null;
        }));
    }
}

