/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public final class Batcher {
    private final int maxSize;
    private final Duration maxDuration;
    private final int prefetch;

    private Batcher(int maxSize, Duration maxDuration, int prefetch) {
        this.maxSize = maxSize;
        this.maxDuration = maxDuration;
        this.prefetch = prefetch;
    }

    public static Batcher create(int maxSize, Duration maxDuration, int prefetch) {
        return new Batcher(maxSize, maxDuration, prefetch);
    }

    public <T, R> Flux<R> applyMapping(Publisher<T> publisher, Function<? super List<T>, ? extends Publisher<? extends R>> mapper, int maxConcurrency) {
        return maxConcurrency <= 1 ? this.toBatches(publisher).concatMap(mapper, this.prefetch) : this.toBatches(publisher).publishOn(Schedulers.immediate(), this.prefetch).flatMap(mapper, maxConcurrency);
    }

    private <T> Flux<List<T>> toBatches(Publisher<T> publisher) {
        if (this.maxSize <= 1) {
            return Flux.from(publisher).map(Collections::singletonList);
        }
        if (this.maxDuration.isZero() || this.maxDuration.isNegative()) {
            throw new IllegalArgumentException("Batching is enabled, but batch duration is not positive");
        }
        return Flux.from(publisher).bufferTimeout(this.maxSize, this.maxDuration);
    }
}

