/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.AsyncResponseHandler;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdentifier>
extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncPbRpcProxy.class);
    private final Cache<String, ProtocolType> hostProxies;
    private final RequestManager requestManager;
    private final RetryPolicy retryPolicy;
    private final SocketFactory socketFactory;
    private final ListeningExecutorService requestManagerExecutor;
    private volatile ListenableFuture<Void> requestManagerFuture;
    protected Token<TokenType> token;
    protected String tokenUser;

    public void serviceStart() {
        this.requestManagerFuture = this.requestManagerExecutor.submit((Callable)this.requestManager);
        Futures.addCallback(this.requestManagerFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                LOG.info("RequestManager shutdown");
            }

            public void onFailure(Throwable t) {
                if (!(t instanceof CancellationException)) {
                    LOG.warn("RequestManager shutdown with error", t);
                }
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    public void serviceStop() {
        if (this.requestManagerFuture != null) {
            this.requestManager.shutdown();
            this.requestManagerFuture.cancel(true);
        }
        this.requestManagerExecutor.shutdown();
    }

    protected final void queueRequest(CallableRequest<?, ?> request) {
        this.requestManager.queueRequest(request);
    }

    public AsyncPbRpcProxy(String name, int numThreads, Configuration conf, Token<TokenType> token, long connectionTimeoutMs, long retrySleepMs, int expectedNodes, int maxPerNode) {
        super(name);
        CacheBuilder cb = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).removalListener(new RemovalListener<String, ProtocolType>(){

            public void onRemoval(RemovalNotification<String, ProtocolType> arg) {
                if (arg == null) {
                    return;
                }
                AsyncPbRpcProxy.this.shutdownProtocolImpl(arg.getValue());
            }
        });
        if (expectedNodes > 0) {
            cb.maximumSize((long)(expectedNodes * 2));
        }
        this.hostProxies = cb.build();
        this.socketFactory = NetUtils.getDefaultSocketFactory((Configuration)conf);
        try {
            this.setToken(token);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep((long)connectionTimeoutMs, (long)retrySleepMs, (TimeUnit)TimeUnit.MILLISECONDS);
        this.requestManager = new RequestManager(numThreads, maxPerNode);
        ExecutorService localExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
        this.requestManagerExecutor = MoreExecutors.listeningDecorator((ExecutorService)localExecutor);
        LOG.info("Setting up AsyncPbRpcProxy withnumThreads=" + numThreads + "retryTime(millis)=" + connectionTimeoutMs + "retrySleep(millis)=" + retrySleepMs);
    }

    protected void setToken(Token<TokenType> newToken) throws IOException {
        if (this.tokensAreEqual(newToken)) {
            return;
        }
        LOG.info("Setting new token as it's not equal to the old one, new token is: {}", newToken);
        this.hostProxies.invalidateAll();
        this.token = newToken;
        if (this.token != null) {
            String tokenUser = this.getTokenUser(this.token);
            if (tokenUser == null) {
                try {
                    tokenUser = UserGroupInformation.getCurrentUser().getShortUserName();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                LOG.warn("Cannot determine token user from the token; using {}", (Object)tokenUser);
            }
            this.tokenUser = tokenUser;
        } else {
            this.tokenUser = null;
        }
    }

    protected boolean tokensAreEqual(Token<TokenType> otherToken) throws IOException {
        int oldSeqNumber = this.token == null ? -1 : ((LlapTokenIdentifier)this.token.decodeIdentifier()).getSequenceNumber();
        int newSeqNumber = otherToken == null ? -1 : ((LlapTokenIdentifier)otherToken.decodeIdentifier()).getSequenceNumber();
        LOG.debug("Check token equality be sequenceNumber: {} <-> {}", (Object)oldSeqNumber, (Object)newSeqNumber);
        return oldSeqNumber == newSeqNumber;
    }

    protected final ProtocolType getProxy(final LlapNodeId nodeId, final Token<TokenType> nodeToken) {
        String hostId = this.getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
        LOG.debug("Getting host proxies for {}", (Object)hostId);
        try {
            return (ProtocolType)this.hostProxies.get((Object)hostId, new Callable<ProtocolType>(){

                @Override
                public ProtocolType call() throws Exception {
                    return AsyncPbRpcProxy.this.createProxy(nodeId, nodeToken);
                }
            });
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private ProtocolType createProxy(final LlapNodeId nodeId, Token<TokenType> nodeToken) throws IOException {
        String tokenUser;
        if (nodeToken == null && this.token == null) {
            LOG.debug("Creating a client without a token for {}", (Object)nodeId);
            return this.createProtocolImpl(this.getConfig(), nodeId.getHostname(), nodeId.getPort(), null, this.retryPolicy, this.socketFactory);
        }
        if (this.token != null && this.tokenUser == null) {
            throw new AssertionError((Object)("Invalid internal state from " + this.token));
        }
        String string = tokenUser = this.tokenUser == null ? this.getTokenUser(nodeToken) : this.tokenUser;
        if (tokenUser == null) {
            tokenUser = UserGroupInformation.getCurrentUser().getShortUserName();
            LOG.warn("Cannot determine token user for UGI; using {}", (Object)tokenUser);
        }
        final UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)tokenUser);
        if (nodeToken == null) {
            nodeToken = new Token(this.token);
        }
        SecurityUtil.setTokenService((Token)nodeToken, (InetSocketAddress)NetUtils.createSocketAddrForHost((String)nodeId.getHostname(), (int)nodeId.getPort()));
        ugi.addToken(nodeToken);
        LOG.debug("Creating a client for {}; the token is {}", (Object)nodeId, (Object)nodeToken);
        return (ProtocolType)ugi.doAs(new PrivilegedAction<ProtocolType>(){

            @Override
            public ProtocolType run() {
                return AsyncPbRpcProxy.this.createProtocolImpl(AsyncPbRpcProxy.this.getConfig(), nodeId.getHostname(), nodeId.getPort(), ugi, AsyncPbRpcProxy.this.retryPolicy, AsyncPbRpcProxy.this.socketFactory);
            }
        });
    }

    private String getHostIdentifier(String hostname, int port) {
        StringBuilder sb = new StringBuilder();
        try {
            InetAddress inetAddress = InetAddress.getByName(hostname);
            sb.append(inetAddress.getHostAddress()).append(":");
        }
        catch (UnknownHostException e) {
            LOG.warn("Unable to determine IP address for host: {}.. Ignoring..", (Object)hostname, (Object)e);
        }
        sb.append(hostname).append(":").append(port);
        return sb.toString();
    }

    protected abstract ProtocolType createProtocolImpl(Configuration var1, String var2, int var3, UserGroupInformation var4, RetryPolicy var5, SocketFactory var6);

    protected abstract void shutdownProtocolImpl(ProtocolType var1);

    protected abstract String getTokenUser(Token<TokenType> var1);

    public static interface ExecuteRequestCallback<T extends Message> {
        public void setResponse(T var1);

        public void indicateError(Throwable var1);
    }

    @VisibleForTesting
    protected static abstract class NodeCallableRequest<REQUEST extends Message, RESPONSE extends Message>
    extends CallableRequest<REQUEST, RESPONSE> {
        protected final LlapNodeId nodeId;

        protected NodeCallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) {
            super(request, callback);
            this.nodeId = nodeId;
        }

        @Override
        public LlapNodeId getNodeId() {
            return this.nodeId;
        }
    }

    protected static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message>
    extends NodeCallableRequest<REQUEST, RESPONSE> {
        private final long TIMEOUT = 60000L;
        private final long BACKOFF_START = 10L;
        private final int FAST_RETRIES = 5;
        private AsyncGet<Message, Exception> responseFuture;

        protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) {
            super(nodeId, request, callback);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public RESPONSE call() throws Exception {
            boolean asyncMode = Client.isAsynchronousMode();
            long deadline = System.currentTimeMillis() + 60000L;
            int numRetries = 0;
            long nextBackoffMs = 10L;
            try {
                Client.setAsynchronousMode((boolean)true);
                boolean sent = false;
                while (!sent) {
                    try {
                        this.callInternal();
                        sent = true;
                    }
                    catch (Exception ex) {
                        if (ex instanceof ServiceException && ex.getCause() != null && ex.getCause() instanceof AsyncCallLimitExceededException) {
                            if (++numRetries >= 5) {
                                Thread.sleep(nextBackoffMs);
                                if (System.currentTimeMillis() > deadline) {
                                    throw new HiveException("Async request timed out in  60000 ms.", ex.getCause());
                                }
                                numRetries = 0;
                                nextBackoffMs *= 2L;
                            }
                            LOG.trace("Async call limit exceeded", (Throwable)ex);
                            continue;
                        }
                        throw ex;
                    }
                }
                this.responseFuture = ProtobufRpcEngine.getAsyncReturnMessage();
                RESPONSE RESPONSE = null;
                return RESPONSE;
            }
            finally {
                Client.setAsynchronousMode((boolean)asyncMode);
            }
        }

        public void callInternal() throws Exception {
        }

        public AsyncGet<Message, Exception> getResponseFuture() {
            return this.responseFuture;
        }
    }

    @VisibleForTesting
    protected static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message>
    implements Callable<RESPONSE> {
        protected final ExecuteRequestCallback<RESPONSE> callback;
        protected final REQUEST request;

        protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> callback) {
            this.request = request;
            this.callback = callback;
        }

        public abstract LlapNodeId getNodeId() throws Exception;

        public ExecuteRequestCallback<RESPONSE> getCallback() {
            return this.callback;
        }

        @Override
        public abstract RESPONSE call() throws Exception;
    }

    private static final class AsyncResponseCallback<TYPE extends Message>
    implements FutureCallback<TYPE> {
        private final ExecuteRequestCallback<TYPE> callback;
        private final LlapNodeId nodeId;
        private final RequestManager requestManager;
        private final AsyncCallableRequest request;
        private final AsyncResponseHandler asyncResponseHandler;

        public AsyncResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, RequestManager requestManager, AsyncCallableRequest request, AsyncResponseHandler asyncResponseHandler) {
            this.callback = callback;
            this.nodeId = nodeId;
            this.requestManager = requestManager;
            this.request = request;
            this.asyncResponseHandler = asyncResponseHandler;
        }

        public void onSuccess(TYPE result) {
            this.asyncResponseHandler.addToAsyncResponseFutureQueue(this.request);
        }

        public void onFailure(Throwable t) {
            try {
                this.callback.indicateError(t);
            }
            finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }
    }

    private static final class ResponseCallback<TYPE extends Message>
    implements FutureCallback<TYPE> {
        private final ExecuteRequestCallback<TYPE> callback;
        private final LlapNodeId nodeId;
        private final RequestManager requestManager;

        public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, RequestManager requestManager) {
            this.callback = callback;
            this.nodeId = nodeId;
            this.requestManager = requestManager;
        }

        public void onSuccess(TYPE result) {
            try {
                this.callback.setResponse(result);
            }
            finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }

        public void onFailure(Throwable t) {
            try {
                this.callback.indicateError(t);
            }
            finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }
    }

    @VisibleForTesting
    public static class RequestManager
    implements Callable<Void> {
        private final Lock lock = new ReentrantLock();
        private final AtomicBoolean isShutdown = new AtomicBoolean(false);
        private final Condition queueCondition = this.lock.newCondition();
        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
        private final int maxConcurrentRequestsPerNode;
        private final ListeningExecutorService executor;
        private final LinkedList<CallableRequest<?, ?>> newRequestList = new LinkedList();
        private final LinkedList<CallableRequest<?, ?>> pendingRequests = new LinkedList();
        private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = new ConcurrentHashMap<LlapNodeId, AtomicInteger>();
        private final LinkedList<LlapNodeId> completedNodes = new LinkedList();
        private final AsyncResponseHandler asyncResponseHandler;
        @VisibleForTesting
        Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<LlapNodeId>();
        @VisibleForTesting
        List<CallableRequest<?, ?>> currentLoopSkippedRequests = new LinkedList();

        public RequestManager(int numThreads, int maxPerNode) {
            ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build());
            this.maxConcurrentRequestsPerNode = maxPerNode;
            this.executor = MoreExecutors.listeningDecorator((ExecutorService)localExecutor);
            this.asyncResponseHandler = new AsyncResponseHandler(this);
            this.asyncResponseHandler.start();
        }

        @Override
        public Void call() {
            while (!this.isShutdown.get()) {
                this.lock.lock();
                try {
                    boolean shouldBreak;
                    if (!this.shouldRun.get()) {
                        this.queueCondition.await();
                    }
                    if (!(shouldBreak = this.process())) continue;
                    break;
                }
                catch (InterruptedException e) {
                    this.handleInterrupt(e);
                    break;
                }
                finally {
                    this.lock.unlock();
                }
            }
            LOG.info("CallScheduler loop exiting");
            return null;
        }

        private void handleInterrupt(InterruptedException e) {
            if (this.isShutdown.get()) {
                return;
            }
            LOG.warn("RunLoop interrupted without being shutdown first");
            throw new RuntimeException(e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void queueRequest(CallableRequest<?, ?> request) {
            LinkedList<CallableRequest<?, ?>> linkedList = this.newRequestList;
            synchronized (linkedList) {
                this.newRequestList.add(request);
                this.shouldRun.set(true);
            }
            this.notifyRunLoop();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void requestFinished(LlapNodeId nodeId) {
            LinkedList<LlapNodeId> linkedList = this.completedNodes;
            synchronized (linkedList) {
                this.completedNodes.add(nodeId);
                this.shouldRun.set(true);
            }
            this.notifyRunLoop();
        }

        public void shutdown() {
            if (!this.isShutdown.getAndSet(true)) {
                this.asyncResponseHandler.shutdownNow();
                this.executor.shutdownNow();
                this.notifyRunLoop();
            }
        }

        @VisibleForTesting
        <T extends Message, U extends Message> void submitToExecutor(CallableRequest<T, U> request, LlapNodeId nodeId) {
            ListenableFuture future = this.executor.submit(request);
            if (request instanceof AsyncCallableRequest) {
                Futures.addCallback((ListenableFuture)future, new AsyncResponseCallback<U>(request.getCallback(), nodeId, this, (AsyncCallableRequest)request, this.asyncResponseHandler), (Executor)MoreExecutors.directExecutor());
            } else {
                Futures.addCallback((ListenableFuture)future, new ResponseCallback<U>(request.getCallback(), nodeId, this), (Executor)MoreExecutors.directExecutor());
            }
        }

        @VisibleForTesting
        boolean process() throws InterruptedException {
            if (this.isShutdown.get()) {
                return true;
            }
            this.currentLoopDisabledNodes.clear();
            this.currentLoopSkippedRequests.clear();
            this.shouldRun.compareAndSet(true, false);
            this.drainNewRequestList();
            this.drainCompletedNodes();
            Iterator iterator = this.pendingRequests.iterator();
            while (iterator.hasNext()) {
                LlapNodeId nodeId;
                CallableRequest request = (CallableRequest)iterator.next();
                iterator.remove();
                try {
                    nodeId = request.getNodeId();
                }
                catch (InterruptedException e) {
                    throw e;
                }
                catch (Exception e) {
                    request.getCallback().indicateError(e);
                    continue;
                }
                if (this.canRunForNode(nodeId, this.currentLoopDisabledNodes)) {
                    this.submitToExecutor(request, nodeId);
                    continue;
                }
                this.currentLoopDisabledNodes.add(nodeId);
                this.currentLoopSkippedRequests.add(request);
            }
            this.pendingRequests.addAll(0, this.currentLoopSkippedRequests);
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drainNewRequestList() {
            LinkedList<CallableRequest<?, ?>> linkedList = this.newRequestList;
            synchronized (linkedList) {
                if (!this.newRequestList.isEmpty()) {
                    this.pendingRequests.addAll(this.newRequestList);
                    this.newRequestList.clear();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void drainCompletedNodes() {
            LinkedList<LlapNodeId> linkedList = this.completedNodes;
            synchronized (linkedList) {
                if (!this.completedNodes.isEmpty()) {
                    for (LlapNodeId nodeId : this.completedNodes) {
                        ((AtomicInteger)this.runningRequests.get(nodeId)).decrementAndGet();
                    }
                }
                this.completedNodes.clear();
            }
        }

        private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> currentRunDisabledNodes) {
            if (currentRunDisabledNodes.contains(nodeId)) {
                return false;
            }
            AtomicInteger count = (AtomicInteger)this.runningRequests.get(nodeId);
            if (count == null) {
                count = new AtomicInteger(0);
                AtomicInteger old = this.runningRequests.putIfAbsent(nodeId, count);
                AtomicInteger atomicInteger = count = old != null ? old : count;
            }
            if (count.incrementAndGet() <= this.maxConcurrentRequestsPerNode) {
                return true;
            }
            count.decrementAndGet();
            return false;
        }

        private void notifyRunLoop() {
            this.lock.lock();
            try {
                this.queueCondition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

