package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/seata/core/rpc/netty/AbstractRpcRemoting.class */
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler implements Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemoting.class);
    protected final ThreadPoolExecutor messageExecutor;
    private static final long NOT_WRITEABLE_CHECK_MILLS = 10;
    private static final int TIMEOUT_CHECK_INTERNAL = 3000;
    protected ChannelHandler[] channelHandlers;
    protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("timeoutChecker", 1, true));
    protected final ConcurrentHashMap<Long, MessageFuture> futures = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
    protected final Object mergeLock = new Object();
    protected volatile long nowMills = 0;
    private final Object lock = new Object();
    protected volatile boolean isSending = false;
    private String group = "DEFAULT";
    protected final Map<Long, MergeMessage> mergeMsgMap = new ConcurrentHashMap();
    boolean allowDumpStack = false;

    public AbstractRpcRemoting(ThreadPoolExecutor threadPoolExecutor) {
        this.messageExecutor = threadPoolExecutor;
    }

    public void init() {
        ShutdownHook.getInstance().addDisposable(this);
        this.timerExecutor.scheduleAtFixedRate(new Runnable() { // from class: io.seata.core.rpc.netty.AbstractRpcRemoting.1
            @Override // java.lang.Runnable
            public void run() {
                ArrayList<MessageFuture> arrayList = new ArrayList(AbstractRpcRemoting.this.futures.size());
                for (MessageFuture messageFuture : AbstractRpcRemoting.this.futures.values()) {
                    if (messageFuture.isTimeout()) {
                        arrayList.add(messageFuture);
                    }
                }
                for (MessageFuture messageFuture2 : arrayList) {
                    AbstractRpcRemoting.this.futures.remove(Long.valueOf(messageFuture2.getRequestMessage().getId()));
                    messageFuture2.setResultMessage(null);
                    if (AbstractRpcRemoting.LOGGER.isDebugEnabled()) {
                        AbstractRpcRemoting.LOGGER.debug("timeout clear future : " + messageFuture2.getRequestMessage().getBody());
                    }
                }
                AbstractRpcRemoting.this.nowMills = System.currentTimeMillis();
            }
        }, 3000L, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override // io.seata.core.rpc.Disposable
    public void destroy() {
        this.timerExecutor.shutdown();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) {
        synchronized (this.lock) {
            if (channelHandlerContext.channel().isWritable()) {
                this.lock.notifyAll();
            }
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object sendAsyncRequestWithResponse(String str, Channel channel, Object obj) throws TimeoutException {
        return sendAsyncRequestWithResponse(str, channel, obj, NettyClientConfig.getRpcRequestTimeout());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object sendAsyncRequestWithResponse(String str, Channel channel, Object obj, long j) throws TimeoutException {
        if (j <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return sendAsyncRequest(str, channel, obj, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object sendAsyncRequestWithoutResponse(String str, Channel channel, Object obj) throws TimeoutException {
        return sendAsyncRequest(str, channel, obj, 0L);
    }

    private Object sendAsyncRequest(String str, Channel channel, Object obj, long j) throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(RpcMessage.getNextMessageId());
        rpcMessage.setAsync(false);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(obj);
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(j);
        this.futures.put(Long.valueOf(rpcMessage.getId()), messageFuture);
        if (str != null) {
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> concurrentHashMap = this.basketMap;
            BlockingQueue<RpcMessage> blockingQueue = concurrentHashMap.get(str);
            if (blockingQueue == null) {
                concurrentHashMap.putIfAbsent(str, new LinkedBlockingQueue());
                blockingQueue = concurrentHashMap.get(str);
            }
            blockingQueue.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }
            if (!this.isSending) {
                synchronized (this.mergeLock) {
                    this.mergeLock.notifyAll();
                }
            }
        } else {
            channelWriteableCheck(channel, obj);
            channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() { // from class: io.seata.core.rpc.netty.AbstractRpcRemoting.2
                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    MessageFuture remove = AbstractRpcRemoting.this.futures.remove(Long.valueOf(rpcMessage.getId()));
                    if (remove != null) {
                        remove.setResultMessage(channelFuture.cause());
                    }
                    AbstractRpcRemoting.this.destroyChannel(channelFuture.channel());
                }
            });
        }
        if (j <= 0) {
            return null;
        }
        try {
            return messageFuture.get(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOGGER.error("wait response error:" + e.getMessage() + ",ip:" + str + ",request:" + obj);
            if (e instanceof TimeoutException) {
                throw ((TimeoutException) e);
            }
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendRequest(Channel channel, Object obj) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(obj instanceof HeartbeatMessage);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(obj);
        rpcMessage.setId(RpcMessage.getNextMessageId());
        if (obj instanceof MergeMessage) {
            this.mergeMsgMap.put(Long.valueOf(rpcMessage.getId()), (MergeMessage) obj);
        }
        channelWriteableCheck(channel, obj);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush(rpcMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendResponse(long j, Channel channel, Object obj) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(obj instanceof HeartbeatMessage);
        rpcMessage.setRequest(false);
        rpcMessage.setBody(obj);
        rpcMessage.setId(j);
        channelWriteableCheck(channel, obj);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);
        }
        channel.writeAndFlush(rpcMessage);
    }

    private void channelWriteableCheck(Channel channel, Object obj) {
        int i = 0;
        synchronized (this.lock) {
            while (!channel.isWritable()) {
                try {
                    i++;
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage());
                }
                if (i > NettyClientConfig.getMaxNotWriteableRetry()) {
                    destroyChannel(channel);
                    throw new FrameworkException("msg:" + (obj == null ? "null" : obj.toString()), FrameworkErrorCode.ChannelIsNotWritable);
                    break;
                }
                this.lock.wait(NOT_WRITEABLE_CHECK_MILLS);
            }
        }
    }

    public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof RpcMessage) {
            final RpcMessage rpcMessage = (RpcMessage) obj;
            if (!rpcMessage.isRequest()) {
                MessageFuture remove = this.futures.remove(Long.valueOf(rpcMessage.getId()));
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("%s msgId:%s, future :%s, body:%s", this, Long.valueOf(rpcMessage.getId()), remove, rpcMessage.getBody()));
                }
                if (remove != null) {
                    remove.setResultMessage(rpcMessage.getBody());
                    return;
                }
                try {
                    this.messageExecutor.execute(new Runnable() { // from class: io.seata.core.rpc.netty.AbstractRpcRemoting.4
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                AbstractRpcRemoting.this.dispatch(rpcMessage.getId(), channelHandlerContext, rpcMessage.getBody());
                            } catch (Throwable th) {
                                AbstractRpcRemoting.LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
                            }
                        }
                    });
                    return;
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.errCode, "thread pool is full, current max pool size is " + this.messageExecutor.getActiveCount());
                    return;
                }
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, Long.valueOf(rpcMessage.getId()), rpcMessage.getBody()));
            }
            try {
                this.messageExecutor.execute(new Runnable() { // from class: io.seata.core.rpc.netty.AbstractRpcRemoting.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            AbstractRpcRemoting.this.dispatch(rpcMessage.getId(), channelHandlerContext, rpcMessage.getBody());
                        } catch (Throwable th) {
                            AbstractRpcRemoting.LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
                        }
                    }
                });
            } catch (RejectedExecutionException e2) {
                LOGGER.error(FrameworkErrorCode.ThreadPoolFull.errCode, "thread pool is full, current max pool size is " + this.messageExecutor.getActiveCount());
                if (this.allowDumpStack) {
                    try {
                        Runtime.getRuntime().exec("jstack " + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + " >d:/" + new Random().nextInt(100) + ".log");
                    } catch (IOException e3) {
                        LOGGER.error(e3.getMessage());
                    }
                    this.allowDumpStack = false;
                }
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        LOGGER.error(FrameworkErrorCode.ExceptionCaught.errCode, channelHandlerContext.channel() + " connect exception. " + th.getMessage(), th);
        try {
            destroyChannel(channelHandlerContext.channel());
        } catch (Exception e) {
            LOGGER.error("", "close channel" + channelHandlerContext.channel() + " fail.", e);
        }
    }

    public abstract void dispatch(long j, ChannelHandlerContext channelHandlerContext, Object obj);

    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(channelHandlerContext + " will closed");
        }
        super.close(channelHandlerContext, channelPromise);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannelPipelineLast(Channel channel, ChannelHandler... channelHandlerArr) {
        if (null == channel || null == channelHandlerArr) {
            return;
        }
        channel.pipeline().addLast(channelHandlerArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setChannelHandlers(ChannelHandler... channelHandlerArr) {
        this.channelHandlers = channelHandlerArr;
    }

    public String getGroup() {
        return this.group;
    }

    public void setGroup(String str) {
        this.group = str;
    }

    public void destroyChannel(Channel channel) {
        destroyChannel(getAddressFromChannel(channel), channel);
    }

    public abstract void destroyChannel(String str, Channel channel);

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAddressFromContext(ChannelHandlerContext channelHandlerContext) {
        return getAddressFromChannel(channelHandlerContext.channel());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAddressFromChannel(Channel channel) {
        SocketAddress remoteAddress = channel.remoteAddress();
        String obj = remoteAddress.toString();
        if (remoteAddress.toString().indexOf(NettyClientConfig.getSocketAddressStartChar()) == 0) {
            obj = remoteAddress.toString().substring(NettyClientConfig.getSocketAddressStartChar().length());
        }
        return obj;
    }
}
