/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.tasks;

import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.spec.A2AError;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.InternalError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.util.Utils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultAggregator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
    private final TaskManager taskManager;
    private final Executor executor;
    private final Executor eventConsumerExecutor;
    private volatile @Nullable Message message;

    public ResultAggregator(TaskManager taskManager, @Nullable Message message, Executor executor, Executor eventConsumerExecutor) {
        this.taskManager = taskManager;
        this.message = message;
        this.executor = executor;
        this.eventConsumerExecutor = eventConsumerExecutor;
    }

    public @Nullable EventKind getCurrentResult() {
        if (this.message != null) {
            return this.message;
        }
        return this.taskManager.getTask();
    }

    public Flow.Publisher<EventQueueItem> consumeAndEmit(EventConsumer consumer) {
        Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
        final Flow.Publisher<EventQueueItem> processed = AsyncUtils.processor(AsyncUtils.createTubeConfig(), allItems, (errorConsumer, item) -> true);
        return new Flow.Publisher<EventQueueItem>(){

            @Override
            public void subscribe(Flow.Subscriber<? super EventQueueItem> subscriber) {
                ResultAggregator.this.eventConsumerExecutor.execute(() -> processed.subscribe(subscriber));
            }
        };
    }

    public EventKind consumeAll(EventConsumer consumer) throws A2AError {
        EventKind result;
        AtomicReference returnedEvent = new AtomicReference();
        Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
        AtomicReference error = new AtomicReference();
        AsyncUtils.consumer(AsyncUtils.createTubeConfig(), allItems, item -> {
            Event event = item.getEvent();
            if (event instanceof Message) {
                Message msg;
                this.message = msg = (Message)event;
                if (returnedEvent.get() == null) {
                    returnedEvent.set(msg);
                    return false;
                }
            }
            return true;
        }, error::set);
        Throwable err = (Throwable)error.get();
        if (err != null) {
            Utils.rethrow((Throwable)err);
        }
        if ((result = (EventKind)returnedEvent.get()) != null) {
            return result;
        }
        Task task = this.taskManager.getTask();
        if (task == null) {
            throw new InternalError("No task or message available after consuming all events");
        }
        return task;
    }

    public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws A2AError {
        Task t;
        EventKind eventKind;
        Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
        AtomicReference message = new AtomicReference();
        AtomicReference capturedTask = new AtomicReference();
        AtomicBoolean interrupted = new AtomicBoolean(false);
        AtomicReference errorRef = new AtomicReference();
        CompletableFuture completionFuture = new CompletableFuture();
        CompletableFuture<Void> consumptionCompletionFuture = new CompletableFuture<Void>();
        CountDownLatch pollingStarted = new CountDownLatch(1);
        CompletableFuture.runAsync(() -> {
            pollingStarted.countDown();
            AsyncUtils.consumer(AsyncUtils.createTubeConfig(), allItems, item -> {
                TaskStatusUpdateEvent tsue;
                Task task;
                TaskStatusUpdateEvent tsue2;
                Task task2;
                Event event = item.getEvent();
                if (event instanceof Throwable) {
                    Throwable t = (Throwable)event;
                    errorRef.set(t);
                    completionFuture.completeExceptionally(t);
                    return false;
                }
                if (event instanceof Message) {
                    Message msg;
                    this.message = msg = (Message)event;
                    message.set(msg);
                    completionFuture.complete(null);
                    return false;
                }
                if (event instanceof Task) {
                    Task t = (Task)event;
                    Task previousTask = (Task)capturedTask.get();
                    capturedTask.set(t);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Captured Task event: id={}, state={} (previous: {})", new Object[]{t.id(), t.status().state(), previousTask != null ? previousTask.id() + "/" + String.valueOf(previousTask.status().state()) : "none"});
                    }
                }
                boolean shouldInterrupt = false;
                boolean isFinalEvent = event instanceof Task && (task2 = (Task)event).status().state().isFinal() || event instanceof TaskStatusUpdateEvent && (tsue2 = (TaskStatusUpdateEvent)event).isFinal() || event instanceof A2AError;
                boolean isAuthRequired = event instanceof Task && (task = (Task)event).status().state() == TaskState.TASK_STATE_AUTH_REQUIRED || event instanceof TaskStatusUpdateEvent && (tsue = (TaskStatusUpdateEvent)event).status().state() == TaskState.TASK_STATE_AUTH_REQUIRED;
                LOGGER.debug("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})", new Object[]{blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName()});
                if (isAuthRequired) {
                    shouldInterrupt = true;
                    LOGGER.debug("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)");
                } else if (!blocking) {
                    shouldInterrupt = true;
                    LOGGER.debug("ResultAggregator: Setting shouldInterrupt=true (non-blocking)");
                } else if (blocking) {
                    shouldInterrupt = true;
                    LOGGER.debug("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", (Object)isFinalEvent);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption", (Object)this.taskIdForLogging(), (Object)(isFinalEvent ? "final" : "non-final"));
                    }
                }
                if (shouldInterrupt) {
                    LOGGER.debug("ResultAggregator: Interrupting consumption (setting interrupted=true)");
                    interrupted.set(true);
                    completionFuture.complete(null);
                    if (!blocking) {
                        consumptionCompletionFuture.complete(null);
                    }
                    if (LOGGER.isDebugEnabled()) {
                        String reason = isAuthRequired ? "auth-required" : (blocking ? "blocking" : "non-blocking");
                        LOGGER.debug("Task {}: Continuing background consumption (reason: {})", (Object)this.taskIdForLogging(), (Object)reason);
                    }
                    return true;
                }
                return true;
            }, throwable -> {
                if (throwable != null) {
                    errorRef.set(throwable);
                    completionFuture.completeExceptionally((Throwable)throwable);
                    consumptionCompletionFuture.completeExceptionally((Throwable)throwable);
                } else {
                    completionFuture.complete(null);
                    consumptionCompletionFuture.complete(null);
                }
            });
        }, this.eventConsumerExecutor);
        try {
            pollingStarted.await(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new InternalError("Interrupted waiting for EventConsumer to start");
        }
        try {
            completionFuture.join();
        }
        catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause != null) {
                Utils.rethrow((Throwable)cause);
            }
            throw e;
        }
        Throwable error = (Throwable)errorRef.get();
        if (error != null) {
            Utils.rethrow((Throwable)error);
        }
        if ((eventKind = (EventKind)message.get()) == null) {
            eventKind = (EventKind)capturedTask.get();
            if (LOGGER.isDebugEnabled() && eventKind instanceof Task) {
                t = (Task)eventKind;
                LOGGER.debug("Returning capturedTask: id={}, state={}", (Object)t.id(), (Object)t.status().state());
            }
        }
        if (eventKind == null) {
            eventKind = this.taskManager.getTask();
            if (LOGGER.isDebugEnabled() && eventKind instanceof Task) {
                t = (Task)eventKind;
                LOGGER.debug("Returning task from TaskStore: id={}, state={}", (Object)t.id(), (Object)t.status().state());
            }
        }
        if (eventKind == null) {
            throw new InternalError("Could not find a Task/Message for " + this.taskManager.getTaskId());
        }
        return new EventTypeAndInterrupt(eventKind, interrupted.get(), consumptionCompletionFuture);
    }

    private String taskIdForLogging() {
        Task task = this.taskManager.getTask();
        return task != null ? task.id() : "unknown";
    }

    public record EventTypeAndInterrupt(EventKind eventType, boolean interrupted, CompletableFuture<Void> consumptionFuture) {
    }
}

