/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

public abstract class MessageStreamUtils {
    private MessageStreamUtils() {
    }

    public static <M extends Message, R> CompletableFuture<R> reduce(@Nonnull MessageStream<M> source, @Nonnull R identity, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> accumulator) {
        Reducer<M, R> reducer = new Reducer<M, R>(source, identity, accumulator);
        source.setCallback(reducer::process);
        return reducer.result();
    }

    public static <M extends Message> CompletableFuture<MessageStream.Entry<M>> asCompletableFuture(@Nonnull MessageStream<M> source) {
        FirstResult<M> firstResult = new FirstResult<M>(source);
        source.setCallback(firstResult::process);
        return firstResult.result();
    }

    private static class Reducer<M extends Message, R> {
        private final CompletableFuture<R> result;
        private final MessageStream<M> source;
        private final BiFunction<R, MessageStream.Entry<M>, R> accumulator;
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final AtomicReference<R> intermediateResult;

        public Reducer(MessageStream<M> source, R identity, BiFunction<R, MessageStream.Entry<M>, R> accumulator) {
            this.source = source;
            this.intermediateResult = new AtomicReference<R>(identity);
            this.accumulator = accumulator;
            this.result = new CompletableFuture();
        }

        public CompletableFuture<R> result() {
            return this.result;
        }

        public void process() {
            boolean continueOnCurrentThread = true;
            while (continueOnCurrentThread && !this.processingGate.getAndSet(true)) {
                try {
                    while (this.source.hasNextAvailable()) {
                        Optional<MessageStream.Entry<M>> nextItem = this.source.next();
                        nextItem.ifPresent(e -> this.intermediateResult.updateAndGet(i -> this.accumulator.apply((R)i, (MessageStream.Entry<M>)e)));
                    }
                    if (this.source.isCompleted()) {
                        this.source.error().ifPresentOrElse(this.result::completeExceptionally, () -> this.result.complete(this.intermediateResult.get()));
                    }
                }
                catch (Exception e2) {
                    this.result.completeExceptionally(e2);
                    this.source.close();
                }
                finally {
                    this.processingGate.set(false);
                }
                continueOnCurrentThread = !this.result.isDone() && (this.source.hasNextAvailable() || this.source.isCompleted());
            }
        }
    }

    private static class FirstResult<M extends Message> {
        private final MessageStream<M> source;
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final CompletableFuture<MessageStream.Entry<M>> result = new CompletableFuture();

        public FirstResult(MessageStream<M> source) {
            this.source = source;
        }

        public void process() {
            if (!this.processingGate.getAndSet(true)) {
                try {
                    if (!this.result.isDone() && this.source.hasNextAvailable()) {
                        this.source.next().ifPresent(this.result::complete);
                    }
                    if (this.source.isCompleted() && !this.result.isDone()) {
                        this.source.error().ifPresentOrElse(this.result::completeExceptionally, () -> this.result.complete(null));
                    }
                }
                finally {
                    this.processingGate.set(false);
                }
            }
        }

        public CompletableFuture<MessageStream.Entry<M>> result() {
            return this.result;
        }
    }
}

