/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.requesthandlers;

import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.agentexecution.SimpleRequestContextBuilder;
import io.a2a.server.events.EnhancedRunnable;
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.QueueManager;
import io.a2a.server.events.TaskQueueExistsException;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetTaskPushNotificationConfigParams;
import io.a2a.spec.InternalError;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.a2a.spec.UnsupportedOperationError;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class DefaultRequestHandler
implements RequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final AgentExecutor agentExecutor;
    private final TaskStore taskStore;
    private final QueueManager queueManager;
    private final PushNotificationConfigStore pushConfigStore;
    private final PushNotificationSender pushSender;
    private final Supplier<RequestContext.Builder> requestContextBuilder;
    private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private final Executor executor;

    @Inject
    public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, PushNotificationSender pushSender, @Internal Executor executor) {
        this.agentExecutor = agentExecutor;
        this.taskStore = taskStore;
        this.queueManager = queueManager;
        this.pushConfigStore = pushConfigStore;
        this.pushSender = pushSender;
        this.executor = executor;
        this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
    }

    @Override
    public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onGetTask {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            LOGGER.debug("No task found for {}. Throwing TaskNotFoundError", (Object)params.id());
            throw new TaskNotFoundError();
        }
        if (params.historyLength() != null && task.getHistory() != null && params.historyLength() < task.getHistory().size()) {
            List history = params.historyLength() <= 0 ? new ArrayList() : task.getHistory().subList(task.getHistory().size() - params.historyLength(), task.getHistory().size() - 1);
            task = new Task.Builder(task).history(history).build();
        }
        LOGGER.debug("Task found {}", (Object)task);
        return task;
    }

    @Override
    public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError {
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        TaskManager taskManager = new TaskManager(task.getId(), task.getContextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null);
        EventQueue queue = this.queueManager.tap(task.getId());
        if (queue == null) {
            queue = EventQueue.create();
        }
        this.agentExecutor.cancel(this.requestContextBuilder.get().setTaskId(task.getId()).setContextId(task.getContextId()).setTask(task).setServerCallContext(context).build(), queue);
        Optional.ofNullable((CompletableFuture)this.runningAgents.get(task.getId())).ifPresent(cf -> cf.cancel(true));
        EventConsumer consumer = new EventConsumer(queue);
        EventKind type = resultAggregator.consumeAll(consumer);
        if (type instanceof Task) {
            Task tempTask = (Task)type;
            return tempTask;
        }
        throw new InternalError("Agent did not return a valid response");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onMessageSend - task: {}; context {}", (Object)params.message().getTaskId(), (Object)params.message().getContextId());
        MessageSendSetup mss = this.initMessageSend(params, context);
        String taskId = mss.requestContext.getTaskId();
        LOGGER.debug("Request context taskId: {}", (Object)taskId);
        EventQueue queue = this.queueManager.createOrTap(taskId);
        ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);
        boolean interrupted = false;
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
        ResultAggregator.EventTypeAndInterrupt etai = null;
        try {
            Task taskResult;
            EventConsumer consumer = new EventConsumer(queue);
            producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
            etai = resultAggregator.consumeAndBreakOnInterrupt(consumer);
            if (etai == null) {
                LOGGER.debug("No result, throwing InternalError");
                throw new InternalError("No result");
            }
            interrupted = etai.interrupted();
            LOGGER.debug("Was interrupted: {}", (Object)interrupted);
            EventKind kind = etai.eventType();
            if (kind instanceof Task && !taskId.equals((taskResult = (Task)kind).getId())) {
                throw new InternalError("Task ID mismatch in agent response");
            }
        }
        finally {
            if (interrupted) {
                this.cleanupProducer(taskId);
            } else {
                this.cleanupProducer(taskId);
            }
        }
        LOGGER.debug("Returning: {}", (Object)etai.eventType());
        return etai.eventType();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Flow.Publisher<StreamingEventKind> onMessageSendStream(MessageSendParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onMessageSendStream - task: {}; context {}", (Object)params.message().getTaskId(), (Object)params.message().getContextId());
        MessageSendSetup mss = this.initMessageSend(params, context);
        AtomicReference<String> taskId = new AtomicReference<String>(mss.requestContext.getTaskId());
        EventQueue queue = this.queueManager.createOrTap(taskId.get());
        ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null);
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
        try {
            EventConsumer consumer = new EventConsumer(queue);
            producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
            Flow.Publisher<Event> results = resultAggregator.consumeAndEmit(consumer);
            Flow.Publisher<Event> eventPublisher = AsyncUtils.processor(AsyncUtils.createTubeConfig(), results, (errorConsumer, event) -> {
                EventKind latest;
                if (event instanceof Task) {
                    Task createdTask = (Task)event;
                    if (!Objects.equals(taskId.get(), createdTask.getId())) {
                        errorConsumer.accept(new InternalError("Task ID mismatch in agent response"));
                    }
                    try {
                        this.queueManager.add(createdTask.getId(), queue);
                        taskId.set(createdTask.getId());
                    }
                    catch (TaskQueueExistsException taskQueueExistsException) {
                        // empty catch block
                    }
                    if (this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotification() != null) {
                        this.pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotification());
                    }
                }
                if (this.pushSender != null && taskId.get() != null && (latest = resultAggregator.getCurrentResult()) instanceof Task) {
                    Task latestTask = (Task)latest;
                    this.pushSender.sendNotification(latestTask);
                }
                return true;
            });
            Flow.Publisher<StreamingEventKind> publisher = AsyncUtils.convertingProcessor(eventPublisher, event -> (StreamingEventKind)event);
            return publisher;
        }
        finally {
            this.cleanupProducer(taskId.get());
        }
    }

    @Override
    public TaskPushNotificationConfig onSetTaskPushNotificationConfig(TaskPushNotificationConfig params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        PushNotificationConfig pushNotificationConfig = this.pushConfigStore.setInfo(params.taskId(), params.pushNotificationConfig());
        return new TaskPushNotificationConfig(params.taskId(), pushNotificationConfig);
    }

    @Override
    public TaskPushNotificationConfig onGetTaskPushNotificationConfig(GetTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        List<PushNotificationConfig> pushNotificationConfigList = this.pushConfigStore.getInfo(params.id());
        if (pushNotificationConfigList == null || pushNotificationConfigList.isEmpty()) {
            throw new InternalError("No push notification config found");
        }
        return new TaskPushNotificationConfig(params.id(), this.getPushNotificationConfig(pushNotificationConfigList, params.pushNotificationConfigId()));
    }

    private PushNotificationConfig getPushNotificationConfig(List<PushNotificationConfig> notificationConfigList, String configId) {
        if (configId != null) {
            for (PushNotificationConfig notificationConfig : notificationConfigList) {
                if (!configId.equals(notificationConfig.id())) continue;
                return notificationConfig;
            }
        }
        return notificationConfigList.get(0);
    }

    @Override
    public Flow.Publisher<StreamingEventKind> onResubscribeToTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError {
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        TaskManager taskManager = new TaskManager(task.getId(), task.getContextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null);
        EventQueue queue = this.queueManager.tap(task.getId());
        if (queue == null) {
            throw new TaskNotFoundError();
        }
        EventConsumer consumer = new EventConsumer(queue);
        Flow.Publisher<Event> results = resultAggregator.consumeAndEmit(consumer);
        return AsyncUtils.convertingProcessor(results, e -> (StreamingEventKind)e);
    }

    @Override
    public List<TaskPushNotificationConfig> onListTaskPushNotificationConfig(ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        List<PushNotificationConfig> pushNotificationConfigList = this.pushConfigStore.getInfo(params.id());
        ArrayList<TaskPushNotificationConfig> taskPushNotificationConfigList = new ArrayList<TaskPushNotificationConfig>();
        if (pushNotificationConfigList != null) {
            for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) {
                TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig);
                taskPushNotificationConfigList.add(taskPushNotificationConfig);
            }
        }
        return taskPushNotificationConfigList;
    }

    @Override
    public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        this.pushConfigStore.deleteInfo(params.id(), params.pushNotificationConfigId());
    }

    private boolean shouldAddPushInfo(MessageSendParams params) {
        return this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotification() != null;
    }

    private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, final RequestContext requestContext, final EventQueue queue) {
        EnhancedRunnable runnable = new EnhancedRunnable(){

            @Override
            public void run() {
                DefaultRequestHandler.this.agentExecutor.execute(requestContext, queue);
                try {
                    DefaultRequestHandler.this.queueManager.awaitQueuePollerStart(queue);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };
        CompletionStage cf = CompletableFuture.runAsync(runnable, this.executor).whenComplete((v, err) -> {
            if (err != null) {
                runnable.setError((Throwable)err);
            }
            queue.close();
            runnable.invokeDoneCallbacks();
        });
        this.runningAgents.put(taskId, (CompletableFuture<Void>)cf);
        return runnable;
    }

    private void cleanupProducer(String taskId) {
        ((CompletableFuture)this.runningAgents.get(taskId)).whenComplete((v, t) -> {
            this.queueManager.close(taskId);
            this.runningAgents.remove(taskId);
        });
    }

    private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
        TaskManager taskManager = new TaskManager(params.message().getTaskId(), params.message().getContextId(), this.taskStore, params.message());
        Task task = taskManager.getTask();
        if (task != null) {
            LOGGER.debug("Found task updating with message {}", (Object)params.message());
            task = taskManager.updateWithMessage(params.message(), task);
            if (this.shouldAddPushInfo(params)) {
                LOGGER.debug("Adding push info");
                this.pushConfigStore.setInfo(task.getId(), params.configuration().pushNotification());
            }
        }
        RequestContext requestContext = this.requestContextBuilder.get().setParams(params).setTaskId(task == null ? null : task.getId()).setContextId(params.message().getContextId()).setTask(task).setServerCallContext(context).build();
        return new MessageSendSetup(taskManager, task, requestContext);
    }

    private record MessageSendSetup(TaskManager taskManager, Task task, RequestContext requestContext) {
    }
}

