/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.client.transport.rest.sse;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.a2a.client.transport.rest.RestErrorMapper;
import io.a2a.client.transport.spi.sse.AbstractSSEEventListener;
import io.a2a.grpc.MessageOrBuilder;
import io.a2a.grpc.StreamResponse;
import io.a2a.grpc.TaskArtifactUpdateEventOrBuilder;
import io.a2a.grpc.TaskOrBuilder;
import io.a2a.grpc.TaskStatusUpdateEventOrBuilder;
import io.a2a.grpc.utils.ProtoUtils;
import io.a2a.spec.Message;
import io.a2a.spec.StreamingEventKind;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.jspecify.annotations.Nullable;

public class SSEEventListener
extends AbstractSSEEventListener {
    private static final Logger log = Logger.getLogger(SSEEventListener.class.getName());

    public SSEEventListener(Consumer<StreamingEventKind> eventHandler, @Nullable Consumer<Throwable> errorHandler) {
        super(eventHandler, errorHandler);
    }

    public void onMessage(String message, @Nullable Future<Void> completableFuture) {
        block2: {
            try {
                log.fine("Streaming message received: " + message);
                StreamResponse.Builder builder = StreamResponse.newBuilder();
                JsonFormat.parser().merge(message, (Message.Builder)builder);
                this.parseAndHandleMessage(builder.build(), completableFuture);
            }
            catch (InvalidProtocolBufferException e) {
                if (this.getErrorHandler() == null) break block2;
                this.getErrorHandler().accept(RestErrorMapper.mapRestError(message, 500));
            }
        }
    }

    private void parseAndHandleMessage(StreamResponse response, @Nullable Future<Void> future) {
        Message event;
        switch (response.getPayloadCase()) {
            case MESSAGE: {
                event = ProtoUtils.FromProto.message((MessageOrBuilder)response.getMessage());
                break;
            }
            case TASK: {
                event = ProtoUtils.FromProto.task((TaskOrBuilder)response.getTask());
                break;
            }
            case STATUS_UPDATE: {
                event = ProtoUtils.FromProto.taskStatusUpdateEvent((TaskStatusUpdateEventOrBuilder)response.getStatusUpdate());
                break;
            }
            case ARTIFACT_UPDATE: {
                event = ProtoUtils.FromProto.taskArtifactUpdateEvent((TaskArtifactUpdateEventOrBuilder)response.getArtifactUpdate());
                break;
            }
            default: {
                log.warning("Invalid stream response " + String.valueOf(response.getPayloadCase()));
                if (this.getErrorHandler() != null) {
                    this.getErrorHandler().accept(new IllegalStateException("Invalid stream response from server: " + String.valueOf(response.getPayloadCase())));
                }
                return;
            }
        }
        this.handleEvent((StreamingEventKind)event, future);
    }
}

