/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.events;

import io.a2a.server.events.EventEnqueueHook;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.LocalEventQueueItem;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.MainEventBusContext;
import io.a2a.server.events.MainEventBusProcessorCallback;
import io.a2a.server.events.QueueManager;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
import io.a2a.spec.A2AError;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MainEventBusProcessor
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class);
    private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
    @Nullable
    private volatile Executor pushNotificationExecutor = null;
    private MainEventBus eventBus;
    private TaskStore taskStore;
    private PushNotificationSender pushSender;
    private QueueManager queueManager;
    private volatile boolean running = true;
    @Nullable
    private Thread processorThread;

    protected MainEventBusProcessor() {
    }

    @Inject
    public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNotificationSender pushSender, QueueManager queueManager) {
        this.eventBus = eventBus;
        this.taskStore = taskStore;
        this.pushSender = pushSender;
        this.queueManager = queueManager;
    }

    public void setCallback(MainEventBusProcessorCallback callback) {
        this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
    }

    public void setPushNotificationExecutor(Executor executor) {
        this.pushNotificationExecutor = executor;
    }

    @PostConstruct
    void start() {
        this.processorThread = new Thread((Runnable)this, "MainEventBusProcessor");
        this.processorThread.setDaemon(true);
        this.processorThread.start();
        LOGGER.info("MainEventBusProcessor started");
    }

    public void ensureStarted() {
    }

    @PreDestroy
    void stop() {
        LOGGER.info("MainEventBusProcessor stopping...");
        this.running = false;
        if (this.processorThread != null) {
            this.processorThread.interrupt();
            try {
                long start = System.currentTimeMillis();
                this.processorThread.join(5000L);
                long elapsed = System.currentTimeMillis() - start;
                LOGGER.info("MainEventBusProcessor thread stopped in {}ms", (Object)elapsed);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
            }
        }
        LOGGER.info("MainEventBusProcessor stopped");
    }

    @Override
    public void run() {
        LOGGER.info("MainEventBusProcessor processing loop started");
        while (this.running) {
            try {
                LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus...");
                MainEventBusContext context = this.eventBus.take();
                LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus", (Object)context.taskId());
                this.processEvent(context);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.info("MainEventBusProcessor interrupted, shutting down");
                break;
            }
            catch (Exception e) {
                LOGGER.error("Error processing event from MainEventBus", (Throwable)e);
            }
        }
        LOGGER.info("MainEventBusProcessor processing loop ended");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEvent(MainEventBusContext context) {
        String taskId = context.taskId();
        Event event = context.eventQueueItem().getEvent();
        EventQueue.MainQueue mainQueue = context.eventQueue();
        LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {}", (Object)taskId, (Object)event.getClass().getSimpleName());
        Object eventToDistribute = null;
        boolean isReplicated = context.eventQueueItem().isReplicated();
        try {
            String errorMessage;
            try {
                boolean isFinal = this.updateTaskStore(taskId, event, isReplicated);
                eventToDistribute = event;
                if (!isFinal) {
                    EventEnqueueHook hook = mainQueue.getEnqueueHook();
                    if (hook != null) {
                        LOGGER.debug("Triggering replication hook for task {} after successful persistence", (Object)taskId);
                        hook.onEnqueue(context.eventQueueItem());
                    }
                } else {
                    LOGGER.debug("Task {} is final - skipping replication hook (handled by ReplicatedQueueManager)", (Object)taskId);
                }
            }
            catch (InternalError e) {
                LOGGER.error("Failed to persist event for task {}, distributing error to clients", (Object)taskId, (Object)e);
                errorMessage = "Failed to persist event: " + e.getMessage();
                eventToDistribute = e;
            }
            catch (Exception e) {
                LOGGER.error("Failed to persist event for task {}, distributing error to clients", (Object)taskId, (Object)e);
                errorMessage = "Failed to persist event: " + e.getMessage();
                eventToDistribute = new InternalError(errorMessage);
            }
            if (!isReplicated && event instanceof StreamingEventKind) {
                StreamingEventKind streamingEvent = (StreamingEventKind)event;
                this.sendPushNotification(taskId, streamingEvent);
            }
            int childCount = mainQueue.getChildCount();
            LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}", new Object[]{eventToDistribute.getClass().getSimpleName(), childCount, taskId});
            LocalEventQueueItem itemToDistribute = new LocalEventQueueItem((Event)eventToDistribute);
            mainQueue.distributeToChildren(itemToDistribute);
            LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}", new Object[]{eventToDistribute.getClass().getSimpleName(), childCount, taskId});
            LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", (Object)taskId);
        }
        finally {
            try {
                if (eventToDistribute != null) {
                    this.callback.onEventProcessed(taskId, (Event)eventToDistribute);
                    if (eventToDistribute == event && this.isFinalEvent(event)) {
                        this.callback.onTaskFinalized(taskId);
                    }
                }
            }
            finally {
                mainQueue.releaseSemaphore();
            }
        }
    }

    private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError {
        try {
            String contextId = this.extractContextId(event);
            TaskManager taskManager = new TaskManager(taskId, contextId, this.taskStore, null);
            boolean isFinal = taskManager.process(event, isReplicated);
            LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", new Object[]{taskId, event.getClass().getSimpleName(), isFinal, isReplicated});
            return isFinal;
        }
        catch (InternalError e) {
            LOGGER.error("Error updating TaskStore via TaskManager for task {}", (Object)taskId, (Object)e);
            throw e;
        }
        catch (Exception e) {
            LOGGER.error("Unexpected error updating TaskStore for task {}", (Object)taskId, (Object)e);
            throw new InternalError("TaskStore persistence failed: " + e.getMessage());
        }
    }

    private void sendPushNotification(String taskId, StreamingEventKind event) {
        Runnable pushTask = () -> {
            try {
                if (event != null) {
                    LOGGER.debug("Sending push notification for task {}", (Object)taskId);
                    this.pushSender.sendNotification(event);
                } else {
                    LOGGER.debug("Skipping push notification - event is null for task {}", (Object)taskId);
                }
            }
            catch (Exception e) {
                LOGGER.error("Error sending push notification for task {}", (Object)taskId, (Object)e);
            }
        };
        if (this.pushNotificationExecutor != null) {
            this.pushNotificationExecutor.execute(pushTask);
        } else {
            CompletableFuture.runAsync(pushTask);
        }
    }

    @Nullable
    private String extractContextId(Event event) {
        if (event instanceof Task) {
            Task task = (Task)event;
            return task.contextId();
        }
        if (event instanceof TaskStatusUpdateEvent) {
            TaskStatusUpdateEvent statusUpdate = (TaskStatusUpdateEvent)event;
            return statusUpdate.contextId();
        }
        if (event instanceof TaskArtifactUpdateEvent) {
            TaskArtifactUpdateEvent artifactUpdate = (TaskArtifactUpdateEvent)event;
            return artifactUpdate.contextId();
        }
        return null;
    }

    private boolean isFinalEvent(Event event) {
        if (event instanceof Task) {
            Task task = (Task)event;
            return task.status() != null && task.status().state() != null && task.status().state().isFinal();
        }
        if (event instanceof TaskStatusUpdateEvent) {
            TaskStatusUpdateEvent statusUpdate = (TaskStatusUpdateEvent)event;
            return statusUpdate.isFinal();
        }
        return event instanceof A2AError;
    }
}

