/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Alo;
import io.atleon.core.Deduplication;
import io.atleon.core.DeduplicationConfig;
import io.atleon.util.Defaults;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

final class DeduplicatingTransformer<T>
implements Function<Publisher<T>, Publisher<T>> {
    private static final Scheduler DEFAULT_SCHEDULER = Schedulers.newBoundedElastic((int)Defaults.THREAD_CAP, (int)Integer.MAX_VALUE, (String)DeduplicatingTransformer.class.getSimpleName());
    private final DeduplicationConfig config;
    private final Deduplication<T> deduplication;
    private final Scheduler sourceScheduler;

    private DeduplicatingTransformer(DeduplicationConfig config, Deduplication<T> deduplication, Scheduler sourceScheduler) {
        this.config = config;
        this.deduplication = deduplication;
        this.sourceScheduler = sourceScheduler;
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig config, Deduplication<T> deduplication) {
        return DeduplicatingTransformer.identity(config, deduplication, DEFAULT_SCHEDULER);
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig config, Deduplication<T> deduplication, Scheduler sourceScheduler) {
        return new DeduplicatingTransformer<T>(config, deduplication, sourceScheduler);
    }

    static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig config, Deduplication<T> deduplication) {
        return DeduplicatingTransformer.alo(config, deduplication, DEFAULT_SCHEDULER);
    }

    static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig config, Deduplication<T> deduplication, Scheduler sourceScheduler) {
        return new DeduplicatingTransformer<Alo<T>>(config, new AloDeduplication<T>(deduplication), sourceScheduler);
    }

    @Override
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? Flux.from(publisher).switchOnFirst((signal, flux) -> flux.transform(this::applyDeduplication)) : publisher;
    }

    private Flux<T> applyDeduplication(Publisher<T> publisher) {
        Scheduler scheduler = Schedulers.single((Scheduler)this.sourceScheduler);
        return Flux.from(publisher).publishOn(scheduler, this.config.getDeduplicationSourcePrefetch()).groupBy(this.deduplication::extractKey).flatMap(groupedFlux -> this.deduplicateGroup((GroupedFlux<Object, T>)groupedFlux, scheduler), this.config.getDeduplicationConcurrency()).subscribeOn(scheduler);
    }

    private Mono<T> deduplicateGroup(GroupedFlux<Object, T> groupedFlux, Scheduler scheduler) {
        return groupedFlux.take(this.config.getDeduplicationDuration(), scheduler).take(this.config.getMaxDeduplicationSize()).reduce(this.deduplication::reduceDuplicates);
    }

    private static final class AloDeduplication<T>
    implements Deduplication<Alo<T>> {
        private final Deduplication<T> deduplication;

        public AloDeduplication(Deduplication<T> deduplication) {
            this.deduplication = deduplication;
        }

        @Override
        public Object extractKey(Alo<T> alo) {
            return this.deduplication.extractKey(alo.get());
        }

        @Override
        public Alo<T> reduceDuplicates(Alo<T> alo1, Alo<T> alo2) {
            return alo1.reduce(this.deduplication::reduceDuplicates, alo2);
        }
    }
}

