/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.client.transport.rest.sse;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.a2a.client.transport.rest.RestErrorMapper;
import io.a2a.grpc.MessageOrBuilder;
import io.a2a.grpc.StreamResponse;
import io.a2a.grpc.TaskArtifactUpdateEventOrBuilder;
import io.a2a.grpc.TaskOrBuilder;
import io.a2a.grpc.TaskStatusUpdateEventOrBuilder;
import io.a2a.grpc.utils.ProtoUtils;
import io.a2a.spec.Message;
import io.a2a.spec.StreamingEventKind;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.jspecify.annotations.Nullable;

public class RestSSEEventListener {
    private static final Logger log = Logger.getLogger(RestSSEEventListener.class.getName());
    private final Consumer<StreamingEventKind> eventHandler;
    private final Consumer<Throwable> errorHandler;

    public RestSSEEventListener(Consumer<StreamingEventKind> eventHandler, Consumer<Throwable> errorHandler) {
        this.eventHandler = eventHandler;
        this.errorHandler = errorHandler;
    }

    public void onMessage(String message, @Nullable Future<Void> completableFuture) {
        try {
            log.fine("Streaming message received: " + message);
            StreamResponse.Builder builder = StreamResponse.newBuilder();
            JsonFormat.parser().merge(message, (Message.Builder)builder);
            this.handleMessage(builder.build());
        }
        catch (InvalidProtocolBufferException e) {
            this.errorHandler.accept((Throwable)RestErrorMapper.mapRestError(message, 500));
        }
    }

    public void onError(Throwable throwable, @Nullable Future<Void> future) {
        if (this.errorHandler != null) {
            this.errorHandler.accept(throwable);
        }
        if (future != null) {
            future.cancel(true);
        }
    }

    private void handleMessage(StreamResponse response) {
        Message event;
        switch (response.getPayloadCase()) {
            case MSG: {
                event = ProtoUtils.FromProto.message((MessageOrBuilder)response.getMsg());
                break;
            }
            case TASK: {
                event = ProtoUtils.FromProto.task((TaskOrBuilder)response.getTask());
                break;
            }
            case STATUS_UPDATE: {
                event = ProtoUtils.FromProto.taskStatusUpdateEvent((TaskStatusUpdateEventOrBuilder)response.getStatusUpdate());
                break;
            }
            case ARTIFACT_UPDATE: {
                event = ProtoUtils.FromProto.taskArtifactUpdateEvent((TaskArtifactUpdateEventOrBuilder)response.getArtifactUpdate());
                break;
            }
            default: {
                log.warning("Invalid stream response " + String.valueOf(response.getPayloadCase()));
                this.errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + String.valueOf(response.getPayloadCase())));
                return;
            }
        }
        this.eventHandler.accept((StreamingEventKind)event);
    }
}

