/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.FluxMessageStream;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public final class FluxUtils {
    private FluxUtils() {
    }

    public static <M extends Message> Flux<MessageStream.Entry<M>> of(@Nonnull MessageStream<M> source) {
        return Flux.create(emitter -> {
            FluxStreamAdapter fluxTask = new FluxStreamAdapter(source, emitter);
            emitter.onRequest(i -> fluxTask.process());
            emitter.onCancel(source::close);
            source.onAvailable(fluxTask::process);
        });
    }

    public static <M extends Message> MessageStream<M> asMessageStream(@Nonnull Flux<M> flux) {
        return FluxUtils.asMessageStream(flux, message -> Context.empty());
    }

    public static <M extends Message> MessageStream<M> asMessageStream(@Nonnull Flux<M> flux, @Nonnull Function<M, Context> contextSupplier) {
        return new FluxMessageStream(flux.map(message -> new SimpleEntry<Message>((Message)message, (Context)contextSupplier.apply(message))));
    }

    public static <M extends Message> Publisher<M> streamToPublisher(Supplier<MessageStream<M>> stream) {
        return Mono.fromSupplier(stream).flatMapMany(FluxUtils::of).map(MessageStream.Entry::message);
    }

    static class FluxStreamAdapter<M extends Message> {
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final MessageStream<M> source;
        private final FluxSink<MessageStream.Entry<M>> emitter;

        public FluxStreamAdapter(MessageStream<M> source, FluxSink<MessageStream.Entry<M>> emitter) {
            this.source = source;
            this.emitter = emitter;
        }

        public void process() {
            if (!this.processingGate.getAndSet(true)) {
                try {
                    long remaining = this.emitter.requestedFromDownstream();
                    while (remaining-- > 0L && this.source.hasNextAvailable() && !this.emitter.isCancelled()) {
                        this.source.next().ifPresent(arg_0 -> this.emitter.next(arg_0));
                    }
                    if (this.source.isCompleted()) {
                        this.source.error().ifPresentOrElse(arg_0 -> this.emitter.error(arg_0), () -> this.emitter.complete());
                    }
                }
                catch (Exception e) {
                    this.emitter.error((Throwable)e);
                    this.source.close();
                }
                finally {
                    this.processingGate.set(false);
                }
            }
        }
    }
}

