/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.polling.reactive;

import io.atleon.polling.Pollable;
import io.atleon.polling.Polled;
import io.atleon.polling.reactive.Poller;
import io.atleon.polling.reactive.PollingEventLoop;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class PollerImp<P, O>
implements Poller<P, O> {
    private final Pollable<P, O> pollable;
    private final Scheduler scheduler;
    private final Sinks.Many<Collection<Polled<P, O>>> sink;
    private final PollingEventLoop<P, O> eventLoop;

    protected PollerImp(Pollable<P, O> pollable, Duration pollingInterval) {
        this.pollable = pollable;
        this.sink = Sinks.many().unicast().onBackpressureBuffer();
        this.scheduler = Schedulers.newSingle((ThreadFactory)new EventThreadFactory());
        this.eventLoop = new PollingEventLoop<P, O>(this.scheduler, pollable, pollingInterval, this.sink);
        this.scheduler.start();
    }

    @Override
    public Pollable<P, O> getPollable() {
        return this.pollable;
    }

    @Override
    public Flux<Collection<Polled<P, O>>> receive() {
        return this.sink.asFlux().doOnRequest(this.eventLoop::onRequest);
    }

    @Override
    public Mono<Void> close() {
        return this.eventLoop.stop().doFinally(s -> this.scheduler.dispose());
    }

    static void defaultUncaughtException(Thread t, Throwable e) {
        System.out.println("Polling worker in group " + t.getThreadGroup().getName() + " failed with an uncaught exception");
    }

    static final class EventThreadFactory
    implements ThreadFactory {
        static final String PREFIX = "reactive-polling";
        static final AtomicLong COUNTER_REFERENCE = new AtomicLong();

        EventThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable runnable) {
            String newThreadName = "reactive-polling-" + COUNTER_REFERENCE.incrementAndGet();
            EmitterThread t = new EmitterThread(runnable, newThreadName);
            t.setUncaughtExceptionHandler(PollerImp::defaultUncaughtException);
            return t;
        }

        static final class EmitterThread
        extends Thread {
            EmitterThread(Runnable target, String name) {
                super(target, name);
            }
        }
    }
}

