/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.remoting.transport.dispatcher.connection;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.ExecutionException;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable;
import org.apache.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;

public class ConnectionOrderedChannelHandler
extends WrappedChannelHandler {
    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter("threadname", "Dubbo");
        this.connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter("connect.queue.capacity", Integer.MAX_VALUE)), (ThreadFactory)new NamedThreadFactory(threadName, true), (RejectedExecutionHandler)new AbortPolicyWithReport(threadName, url));
        this.queuewarninglimit = url.getParameter("connect.queue.warning.size", 1000);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            this.checkQueueLength();
            this.connectionExecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelEventRunnable.ChannelState.CONNECTED));
        }
        catch (Throwable t) {
            throw new ExecutionException((Object)"connect event", channel, this.getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            this.checkQueueLength();
            this.connectionExecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelEventRunnable.ChannelState.DISCONNECTED));
        }
        catch (Throwable t) {
            throw new ExecutionException((Object)"disconnected event", channel, this.getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = this.getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, this.handler, ChannelEventRunnable.ChannelState.RECEIVED, message));
        }
        catch (Throwable t) {
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                this.sendFeedback(channel, (Request)message, t);
                return;
            }
            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = this.getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, this.handler, ChannelEventRunnable.ChannelState.CAUGHT, exception));
        }
        catch (Throwable t) {
            throw new ExecutionException((Object)"caught event", channel, this.getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (this.connectionExecutor.getQueue().size() > this.queuewarninglimit) {
            logger.warn((Throwable)new IllegalThreadStateException("connectionordered channel handler `queue size: " + this.connectionExecutor.getQueue().size() + " exceed the warning limit number :" + this.queuewarninglimit));
        }
    }
}

