/*
 * Decompiled with CFR 0.152.
 */
package dev.langchain4j.model.bedrock.internal;

import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.internal.ChatRequestValidationUtils;
import dev.langchain4j.model.ModelProvider;
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.bedrock.internal.AbstractSharedBedrockChatModel;
import dev.langchain4j.model.bedrock.internal.Json;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.listener.ChatModelListener;
import dev.langchain4j.model.chat.listener.ChatModelRequestContext;
import dev.langchain4j.model.chat.listener.ChatModelResponseContext;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
import dev.langchain4j.model.chat.request.ResponseFormat;
import dev.langchain4j.model.chat.request.ToolChoice;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.ChatResponseMetadata;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import dev.langchain4j.model.output.Response;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient;
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClientBuilder;
import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamRequest;
import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelWithResponseStreamResponseHandler;

public abstract class AbstractBedrockStreamingChatModel
extends AbstractSharedBedrockChatModel
implements StreamingChatModel {
    private static final Logger log = LoggerFactory.getLogger(AbstractBedrockStreamingChatModel.class);
    private volatile BedrockRuntimeAsyncClient asyncClient;

    protected AbstractBedrockStreamingChatModel(AbstractBedrockStreamingChatModelBuilder<?, ?> b) {
        super(b);
        this.asyncClient = b.asyncClient;
    }

    public void chat(ChatRequest chatRequest, final StreamingChatResponseHandler handler) {
        ChatRequestValidationUtils.validateMessages((List)chatRequest.messages());
        ChatRequestParameters parameters = chatRequest.parameters();
        ChatRequestValidationUtils.validateParameters((ChatRequestParameters)parameters);
        ChatRequestValidationUtils.validate((List)parameters.toolSpecifications());
        ChatRequestValidationUtils.validate((ToolChoice)parameters.toolChoice());
        ChatRequestValidationUtils.validate((ResponseFormat)parameters.responseFormat());
        StreamingResponseHandler<AiMessage> legacyHandler = new StreamingResponseHandler<AiMessage>(){

            public void onNext(String token) {
                handler.onPartialResponse(token);
            }

            public void onComplete(Response<AiMessage> response) {
                ChatResponse chatResponse = ChatResponse.builder().aiMessage((AiMessage)response.content()).metadata(ChatResponseMetadata.builder().tokenUsage(response.tokenUsage()).finishReason(response.finishReason()).build()).build();
                handler.onCompleteResponse(chatResponse);
            }

            public void onError(Throwable error) {
                handler.onError(error);
            }
        };
        this.generate(chatRequest.messages(), legacyHandler);
    }

    private void generate(List<ChatMessage> messages, StreamingResponseHandler<AiMessage> handler) {
        InvokeModelWithResponseStreamRequest request = (InvokeModelWithResponseStreamRequest)InvokeModelWithResponseStreamRequest.builder().body(SdkBytes.fromUtf8String((String)this.convertMessagesToAwsBody(messages))).modelId(this.getModelId()).contentType("application/json").accept("application/json").build();
        ChatRequest listenerRequest = this.createListenerRequest(request, messages, Collections.emptyList());
        ConcurrentHashMap attributes = new ConcurrentHashMap();
        ChatModelRequestContext requestContext = new ChatModelRequestContext(listenerRequest, this.provider(), attributes);
        this.listeners.forEach(listener -> {
            try {
                listener.onRequest(requestContext);
            }
            catch (Exception e) {
                log.warn("Exception while calling model listener", (Throwable)e);
            }
        });
        StringBuffer finalCompletion = new StringBuffer();
        InvokeModelWithResponseStreamResponseHandler.Visitor visitor = InvokeModelWithResponseStreamResponseHandler.Visitor.builder().onChunk(chunk -> {
            StreamingResponse sr = Json.fromJson(chunk.bytes().asUtf8String(), StreamingResponse.class);
            finalCompletion.append(sr.completion);
            handler.onNext(sr.completion);
        }).build();
        InvokeModelWithResponseStreamResponseHandler h = ((InvokeModelWithResponseStreamResponseHandler.Builder)((InvokeModelWithResponseStreamResponseHandler.Builder)((InvokeModelWithResponseStreamResponseHandler.Builder)InvokeModelWithResponseStreamResponseHandler.builder().onEventStream(stream -> stream.subscribe(event -> event.accept(visitor)))).onComplete(() -> {
            Response response = Response.from((Object)new AiMessage(finalCompletion.toString()));
            ChatResponse listenerResponse = this.createListenerResponse(null, null, (Response<AiMessage>)response);
            ChatModelResponseContext responseContext = new ChatModelResponseContext(listenerResponse, listenerRequest, this.provider(), attributes);
            this.listeners.forEach(listener -> {
                try {
                    listener.onResponse(responseContext);
                }
                catch (Exception e) {
                    log.warn("Exception while calling model listener", (Throwable)e);
                }
            });
            handler.onComplete(response);
        })).onError(throwable -> {
            this.listenerErrorResponse((Throwable)throwable, listenerRequest, this.provider(), attributes);
            handler.onError(throwable);
        })).build();
        try {
            this.getAsyncClient().invokeModelWithResponseStream(request, h).join();
        }
        catch (RuntimeException e) {
            log.error("Error on bedrock stream request", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public BedrockRuntimeAsyncClient getAsyncClient() {
        if (this.asyncClient == null) {
            AbstractBedrockStreamingChatModel abstractBedrockStreamingChatModel = this;
            synchronized (abstractBedrockStreamingChatModel) {
                if (this.asyncClient == null) {
                    this.asyncClient = this.initAsyncClient();
                }
            }
        }
        return this.asyncClient;
    }

    private BedrockRuntimeAsyncClient initAsyncClient() {
        BedrockRuntimeAsyncClient client = (BedrockRuntimeAsyncClient)((BedrockRuntimeAsyncClientBuilder)((BedrockRuntimeAsyncClientBuilder)((BedrockRuntimeAsyncClientBuilder)BedrockRuntimeAsyncClient.builder().region(this.region)).credentialsProvider(this.credentialsProvider)).overrideConfiguration(c -> c.apiCallTimeout(this.timeout))).build();
        return client;
    }

    public List<ChatModelListener> listeners() {
        return this.listeners;
    }

    public ModelProvider provider() {
        return ModelProvider.AMAZON_BEDROCK;
    }

    public static abstract class AbstractBedrockStreamingChatModelBuilder<C extends AbstractBedrockStreamingChatModel, B extends AbstractBedrockStreamingChatModelBuilder<C, B>>
    extends AbstractSharedBedrockChatModel.AbstractSharedBedrockChatModelBuilder<C, B> {
        private BedrockRuntimeAsyncClient asyncClient;

        public B asyncClient(BedrockRuntimeAsyncClient asyncClient) {
            this.asyncClient = asyncClient;
            return (B)this.self();
        }

        @Override
        protected abstract B self();

        @Override
        public abstract C build();

        @Override
        public String toString() {
            return "AbstractBedrockStreamingChatModel.AbstractBedrockStreamingChatModelBuilder(super=" + super.toString() + ", asyncClient=" + String.valueOf(this.asyncClient) + ")";
        }
    }

    static class StreamingResponse {
        public String completion;

        StreamingResponse() {
        }
    }
}

