/*
 * Decompiled with CFR 0.152.
 */
package com.embabel.agent.a2a.server.support;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.a2a.spec.Message;
import io.a2a.spec.SendStreamingMessageResponse;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.NoWhenBranchMatchedException;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
@Profile(value={"a2a"})
@Metadata(mv={2, 1, 0}, k=1, xi=48, d1={"\u0000>\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0017\u0018\u00002\u00020\u0001B\u000f\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u00a2\u0006\u0004\b\u0004\u0010\u0005J\u0010\u0010\u0010\u001a\u00020\r2\u0006\u0010\u0011\u001a\u00020\fH\u0016J\u0018\u0010\u0012\u001a\u00020\u00132\u0006\u0010\u0011\u001a\u00020\f2\u0006\u0010\u0014\u001a\u00020\u0015H\u0016J\u0010\u0010\u0016\u001a\u00020\u00132\u0006\u0010\u0011\u001a\u00020\fH\u0016J\b\u0010\u0017\u001a\u00020\u0013H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0092\u0004\u00a2\u0006\u0002\n\u0000R\u0018\u0010\u0006\u001a\n \b*\u0004\u0018\u00010\u00070\u0007X\u0092\u0004\u00a2\u0006\u0004\n\u0002\u0010\tR\u001a\u0010\n\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\r0\u000bX\u0092\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000e\u001a\u00020\u000fX\u0092\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0018"}, d2={"Lcom/embabel/agent/a2a/server/support/A2AStreamingHandler;", "", "objectMapper", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "<init>", "(Lcom/fasterxml/jackson/databind/ObjectMapper;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "Lorg/slf4j/Logger;", "activeStreams", "Ljava/util/concurrent/ConcurrentHashMap;", "", "Lorg/springframework/web/servlet/mvc/method/annotation/SseEmitter;", "scheduler", "Ljava/util/concurrent/ScheduledExecutorService;", "createStream", "streamId", "sendStreamEvent", "", "event", "Lio/a2a/spec/StreamingEventKind;", "closeStream", "shutdown", "embabel-agent-a2a"})
public class A2AStreamingHandler {
    @NotNull
    private final ObjectMapper objectMapper;
    private final Logger logger;
    @NotNull
    private final ConcurrentHashMap<String, SseEmitter> activeStreams;
    @NotNull
    private final ScheduledExecutorService scheduler;

    public A2AStreamingHandler(@NotNull ObjectMapper objectMapper) {
        Intrinsics.checkNotNullParameter((Object)objectMapper, (String)"objectMapper");
        this.objectMapper = objectMapper;
        this.logger = LoggerFactory.getLogger(A2AStreamingHandler.class);
        this.activeStreams = new ConcurrentHashMap();
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        Intrinsics.checkNotNullExpressionValue((Object)scheduledExecutorService, (String)"newScheduledThreadPool(...)");
        this.scheduler = scheduledExecutorService;
    }

    @NotNull
    public SseEmitter createStream(@NotNull String streamId) {
        Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
        this.logger.info("Creating SSE stream for streamId: {}", (Object)streamId);
        SseEmitter emitter = new SseEmitter(Long.valueOf(Long.MAX_VALUE));
        ((Map)this.activeStreams).put(streamId, emitter);
        emitter.onCompletion(() -> A2AStreamingHandler.createStream$lambda$0(this, streamId));
        emitter.onTimeout(() -> A2AStreamingHandler.createStream$lambda$1(this, streamId));
        try {
            emitter.send(SseEmitter.event().name("connected").data((Object)MapsKt.mapOf((Pair)TuplesKt.to((Object)"streamId", (Object)streamId))));
        }
        catch (Exception e) {
            this.logger.error("Error sending initial event", (Throwable)e);
            emitter.completeWithError((Throwable)e);
        }
        return emitter;
    }

    public void sendStreamEvent(@NotNull String streamId, @NotNull StreamingEventKind event) {
        Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
        Intrinsics.checkNotNullParameter((Object)event, (String)"event");
        SseEmitter sseEmitter = this.activeStreams.get(streamId);
        if (sseEmitter == null) {
            A2AStreamingHandler $this$sendStreamEvent_u24lambda_u242 = this;
            boolean bl = false;
            $this$sendStreamEvent_u24lambda_u242.logger.warn("No active stream found for streamId: {}", (Object)streamId);
            return;
        }
        SseEmitter emitter = sseEmitter;
        try {
            SseEmitter.SseEventBuilder sseEventBuilder;
            StreamingEventKind streamingEventKind = event;
            if (streamingEventKind instanceof Message) {
                sseEventBuilder = SseEmitter.event().name("message").data((Object)this.objectMapper.writeValueAsString((Object)event), MediaType.APPLICATION_JSON);
            } else if (streamingEventKind instanceof Task) {
                sseEventBuilder = SseEmitter.event().name("task").data((Object)this.objectMapper.writeValueAsString((Object)event), MediaType.APPLICATION_JSON);
            } else if (streamingEventKind instanceof TaskStatusUpdateEvent) {
                sseEventBuilder = SseEmitter.event().name("task-update").data((Object)this.objectMapper.writeValueAsString((Object)new SendStreamingMessageResponse("2.0", (Object)streamId, event, null)), MediaType.APPLICATION_JSON);
            } else if (streamingEventKind instanceof TaskArtifactUpdateEvent) {
                sseEventBuilder = SseEmitter.event().name("task-update").data((Object)this.objectMapper.writeValueAsString((Object)new SendStreamingMessageResponse("2.0", (Object)streamId, event, null)), MediaType.APPLICATION_JSON);
            } else {
                throw new NoWhenBranchMatchedException();
            }
            SseEmitter.SseEventBuilder sseEventBuilder2 = sseEventBuilder;
            Intrinsics.checkNotNull((Object)sseEventBuilder2);
            SseEmitter.SseEventBuilder eventData = sseEventBuilder2;
            emitter.send(eventData);
        }
        catch (Exception e) {
            this.logger.error("Error sending stream event", (Throwable)e);
            emitter.completeWithError((Throwable)e);
        }
    }

    public void closeStream(@NotNull String streamId) {
        block0: {
            SseEmitter emitter;
            Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
            SseEmitter sseEmitter = emitter = this.activeStreams.remove(streamId);
            if (sseEmitter == null) break block0;
            sseEmitter.complete();
        }
    }

    public void shutdown() {
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.scheduler.shutdownNow();
        }
    }

    private static final void createStream$lambda$0(A2AStreamingHandler this$0, String $streamId) {
        this$0.logger.info("Stream completed for streamId: {}", (Object)$streamId);
        this$0.activeStreams.remove($streamId);
    }

    private static final void createStream$lambda$1(A2AStreamingHandler this$0, String $streamId) {
        this$0.logger.info("Stream timed out for streamId: {}", (Object)$streamId);
        this$0.activeStreams.remove($streamId);
    }
}

