/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.io.netty.manager;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.io.ChannelClosedProactivelyEvent;
import com.couchbase.client.core.cnc.events.io.IdleStreamingEndpointClosedEvent;
import com.couchbase.client.core.cnc.events.io.InvalidRequestDetectedEvent;
import com.couchbase.client.core.cnc.events.io.UnsupportedResponseTypeReceivedEvent;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.FullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpContent;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpHeaderNames;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpResponse;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.LastHttpContent;
import com.couchbase.client.core.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.core.deps.io.netty.handler.timeout.IdleStateHandler;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.endpoint.BaseEndpoint;
import com.couchbase.client.core.io.IoContext;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.manager.BucketConfigStreamingRequest;
import com.couchbase.client.core.msg.manager.BucketConfigStreamingResponse;
import com.couchbase.client.core.msg.manager.ManagerRequest;
import com.couchbase.client.core.retry.RetryOrchestrator;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.service.ServiceType;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@ChannelHandler.Sharable
public class ManagerMessageHandler
extends ChannelDuplexHandler {
    private final CoreContext coreContext;
    private IoContext ioContext;
    private ManagerRequest<Response> currentRequest;
    private BucketConfigStreamingResponse streamingResponse;
    private String remoteHost;
    private ByteBuf currentContent;
    private HttpResponse currentResponse;
    private final EventBus eventBus;
    private final BaseEndpoint endpoint;

    public ManagerMessageHandler(BaseEndpoint endpoint, CoreContext coreContext) {
        this.endpoint = endpoint;
        this.coreContext = coreContext;
        this.eventBus = coreContext.environment().eventBus();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.currentContent = ctx.alloc().buffer();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        this.currentContent.release();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ioContext = new IoContext(this.coreContext, ctx.channel().localAddress(), ctx.channel().remoteAddress(), Optional.empty());
        this.remoteHost = this.endpoint.remoteHostname() + ":" + this.endpoint.remotePort();
        ctx.fireChannelActive();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof ManagerRequest) {
            block7: {
                if (this.currentRequest != null) {
                    RetryOrchestrator.maybeRetry(this.coreContext, (ManagerRequest)msg, RetryReason.NOT_PIPELINED_REQUEST_IN_FLIGHT);
                    if (this.endpoint != null) {
                        this.endpoint.decrementOutstandingRequests();
                    }
                    return;
                }
                try {
                    this.currentRequest = (ManagerRequest)msg;
                    FullHttpRequest encoded = (FullHttpRequest)this.currentRequest.encode();
                    encoded.headers().set((CharSequence)HttpHeaderNames.HOST, (Object)this.remoteHost);
                    encoded.headers().set((CharSequence)HttpHeaderNames.USER_AGENT, (Object)this.endpoint.context().environment().userAgent().formattedLong());
                    ctx.writeAndFlush(encoded);
                }
                catch (Throwable t) {
                    this.currentRequest.response().completeExceptionally(t);
                    if (this.endpoint == null) break block7;
                    this.endpoint.decrementOutstandingRequests();
                }
            }
            this.currentContent.clear();
        } else {
            if (this.endpoint != null) {
                this.endpoint.decrementOutstandingRequests();
            }
            this.eventBus.publish(new InvalidRequestDetectedEvent(this.ioContext, ServiceType.MANAGER, msg));
            ctx.channel().close().addListener(f -> this.eventBus.publish(new ChannelClosedProactivelyEvent(this.ioContext, ChannelClosedProactivelyEvent.Reason.INVALID_REQUEST_DETECTED)));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse) {
            this.currentResponse = (HttpResponse)msg;
            if (this.isStreamingConfigRequest()) {
                this.streamingResponse = (BucketConfigStreamingResponse)this.currentRequest.decode(this.currentResponse, null);
                this.currentRequest.succeed(this.streamingResponse);
                ctx.pipeline().addFirst(new IdleStateHandler(this.coreContext.environment().ioConfig().configIdleRedialTimeout().toMillis(), 0L, 0L, TimeUnit.MILLISECONDS));
            }
        } else if (msg instanceof HttpContent) {
            this.currentContent.writeBytes(((HttpContent)msg).content());
            if (this.isStreamingConfigRequest()) {
                String encodedConfig;
                int separatorIndex;
                while ((separatorIndex = (encodedConfig = this.currentContent.toString(StandardCharsets.UTF_8)).indexOf("\n\n\n\n")) >= 0) {
                    String content = encodedConfig.substring(0, separatorIndex);
                    this.streamingResponse.pushConfig(content.trim());
                    this.currentContent.clear();
                    this.currentContent.writeBytes(encodedConfig.substring(separatorIndex + 4).getBytes(StandardCharsets.UTF_8));
                }
            }
            if (msg instanceof LastHttpContent) {
                if (this.isStreamingConfigRequest()) {
                    this.streamingResponse.completeStream();
                    this.streamingResponse = null;
                    ctx.pipeline().remove(IdleStateHandler.class);
                } else {
                    byte[] copy = new byte[this.currentContent.readableBytes()];
                    this.currentContent.readBytes(copy);
                    Response response = this.currentRequest.decode(this.currentResponse, copy);
                    this.currentRequest.succeed(response);
                }
                this.currentRequest = null;
                if (this.endpoint != null) {
                    this.endpoint.markRequestCompletion();
                }
            }
        } else {
            this.ioContext.environment().eventBus().publish(new UnsupportedResponseTypeReceivedEvent(this.ioContext, msg));
        }
        ReferenceCountUtil.release(msg);
    }

    private boolean isStreamingConfigRequest() {
        return BucketConfigStreamingRequest.class.isAssignableFrom(this.currentRequest.getClass());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (this.streamingResponse != null) {
            this.streamingResponse.completeStream();
        }
        if (this.currentRequest != null) {
            RetryOrchestrator.maybeRetry(this.ioContext, this.currentRequest, RetryReason.CHANNEL_CLOSED_WHILE_IN_FLIGHT);
        }
        ctx.fireChannelInactive();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            this.endpoint.disconnect();
            this.coreContext.environment().eventBus().publish(new IdleStreamingEndpointClosedEvent(this.ioContext));
        } else {
            ctx.fireUserEventTriggered(evt);
        }
    }
}

