/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kudu.Schema;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.AlterTableRequest;
import org.apache.kudu.client.AlterTableResponse;
import org.apache.kudu.client.AsyncKuduScanner;
import org.apache.kudu.client.AsyncKuduSession;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CallResponse;
import org.apache.kudu.client.ConnectionCache;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.CreateTableRequest;
import org.apache.kudu.client.CreateTableResponse;
import org.apache.kudu.client.DeadlineTracker;
import org.apache.kudu.client.DeleteTableRequest;
import org.apache.kudu.client.DeleteTableResponse;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.GetMasterRegistrationReceived;
import org.apache.kudu.client.GetMasterRegistrationRequest;
import org.apache.kudu.client.GetMasterRegistrationResponse;
import org.apache.kudu.client.GetTableLocationsRequest;
import org.apache.kudu.client.GetTableSchemaRequest;
import org.apache.kudu.client.GetTableSchemaResponse;
import org.apache.kudu.client.IsAlterTableDoneRequest;
import org.apache.kudu.client.IsAlterTableDoneResponse;
import org.apache.kudu.client.IsCreateTableDoneRequest;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduRpc;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.ListTablesRequest;
import org.apache.kudu.client.ListTablesResponse;
import org.apache.kudu.client.ListTabletServersRequest;
import org.apache.kudu.client.ListTabletServersResponse;
import org.apache.kudu.client.LocatedTablet;
import org.apache.kudu.client.NoLeaderFoundException;
import org.apache.kudu.client.NonCoveredRangeException;
import org.apache.kudu.client.NonRecoverableException;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RecoverableException;
import org.apache.kudu.client.RemoteTablet;
import org.apache.kudu.client.RequestTracker;
import org.apache.kudu.client.RpcTraceFrame;
import org.apache.kudu.client.ServerInfo;
import org.apache.kudu.client.Statistics;
import org.apache.kudu.client.Status;
import org.apache.kudu.client.TableLocationsCache;
import org.apache.kudu.client.TabletClient;
import org.apache.kudu.client.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.client.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.client.shaded.com.google.common.collect.Lists;
import org.apache.kudu.client.shaded.com.google.common.net.HostAndPort;
import org.apache.kudu.client.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kudu.client.shaded.com.google.protobuf.Message;
import org.apache.kudu.client.shaded.org.jboss.netty.buffer.ChannelBuffer;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.apache.kudu.client.shaded.org.jboss.netty.util.HashedWheelTimer;
import org.apache.kudu.client.shaded.org.jboss.netty.util.Timeout;
import org.apache.kudu.client.shaded.org.jboss.netty.util.TimerTask;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.AsyncUtil;
import org.apache.kudu.util.NetUtil;
import org.apache.kudu.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class AsyncKuduClient
implements AutoCloseable {
    public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduClient.class);
    public static final int SLEEP_TIME = 500;
    public static final byte[] EMPTY_ARRAY = new byte[0];
    public static final long NO_TIMESTAMP = -1L;
    public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000L;
    public static final long DEFAULT_SOCKET_READ_TIMEOUT_MS = 10000L;
    private static final long MAX_RPC_ATTEMPTS = 100L;
    static final int MAX_RETURNED_TABLE_LOCATIONS = 10;
    private final ClientSocketChannelFactory channelFactory;
    private final ConcurrentHashMap<String, TableLocationsCache> tableLocations = new ConcurrentHashMap();
    private final ConnectionCache connectionCache;
    @GuardedBy(value="sessions")
    private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>();
    static final String MASTER_TABLE_NAME_PLACEHOLDER = "Kudu Master";
    final KuduTable masterTable;
    private final List<HostAndPort> masterAddresses;
    private final HashedWheelTimer timer;
    private long lastPropagatedTimestamp = -1L;
    private final Set<String> tablesNotServed = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Semaphore masterLookups = new Semaphore(50);
    private final Random sleepRandomizer = new Random();
    private final long defaultOperationTimeoutMs;
    private final long defaultAdminOperationTimeoutMs;
    private final long defaultSocketReadTimeoutMs;
    private final Statistics statistics;
    private final boolean statisticsDisabled;
    private final RequestTracker requestTracker;
    private volatile boolean closed;

    private AsyncKuduClient(AsyncKuduClientBuilder b) {
        this.channelFactory = b.createChannelFactory();
        this.masterAddresses = b.masterAddresses;
        this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER, MASTER_TABLE_NAME_PLACEHOLDER, null, null);
        this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
        this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
        this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
        this.statisticsDisabled = b.statisticsDisabled;
        this.statistics = this.statisticsDisabled ? null : new Statistics();
        this.timer = b.timer;
        String clientId = UUID.randomUUID().toString().replace("-", "");
        this.requestTracker = new RequestTracker(clientId);
        this.connectionCache = new ConnectionCache(this);
    }

    @VisibleForTesting
    public synchronized void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) {
        if (this.lastPropagatedTimestamp == -1L || this.lastPropagatedTimestamp < lastPropagatedTimestamp) {
            this.lastPropagatedTimestamp = lastPropagatedTimestamp;
        }
    }

    @VisibleForTesting
    public synchronized long getLastPropagatedTimestamp() {
        return this.lastPropagatedTimestamp;
    }

    public KuduClient syncClient() {
        return new KuduClient(this);
    }

    public Deferred<KuduTable> createTable(final String name, Schema schema, CreateTableOptions builder) {
        this.checkIsClosed();
        if (builder == null) {
            throw new IllegalArgumentException("CreateTableOptions may not be null");
        }
        if (!builder.getBuilder().getPartitionSchema().hasRangeSchema() && builder.getBuilder().getPartitionSchema().getHashBucketSchemasCount() == 0) {
            throw new IllegalArgumentException("Table partitioning must be specified using setRangePartitionColumns or addHashPartitions");
        }
        CreateTableRequest create = new CreateTableRequest(this.masterTable, name, schema, builder);
        create.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(create).addCallbackDeferring((Callback)new Callback<Deferred<KuduTable>, CreateTableResponse>(){

            public Deferred<KuduTable> call(CreateTableResponse createTableResponse) throws Exception {
                return AsyncKuduClient.this.openTable(name);
            }
        });
    }

    public Deferred<DeleteTableResponse> deleteTable(String name) {
        this.checkIsClosed();
        DeleteTableRequest delete = new DeleteTableRequest(this.masterTable, name);
        delete.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(delete);
    }

    public Deferred<AlterTableResponse> alterTable(String name, AlterTableOptions ato) {
        this.checkIsClosed();
        AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato);
        alter.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        Deferred<AlterTableResponse> response = this.sendRpcToTablet(alter);
        if (ato.hasAddDropRangePartitions()) {
            return response.addCallback((Callback)new Callback<AlterTableResponse, AlterTableResponse>(){

                public AlterTableResponse call(AlterTableResponse resp) {
                    if (resp.getTableId() != null) {
                        AsyncKuduClient.this.tableLocations.remove(resp.getTableId());
                    } else {
                        AsyncKuduClient.this.tableLocations.clear();
                    }
                    return resp;
                }

                public String toString() {
                    return "ClearTableLocationsCacheCB";
                }
            }).addErrback((Callback)new Callback<Exception, Exception>(){

                public Exception call(Exception e) {
                    AsyncKuduClient.this.tableLocations.clear();
                    return e;
                }

                public String toString() {
                    return "ClearTableLocationsCacheEB";
                }
            });
        }
        return response;
    }

    public Deferred<IsAlterTableDoneResponse> isAlterTableDone(String name) {
        this.checkIsClosed();
        IsAlterTableDoneRequest request = new IsAlterTableDoneRequest(this.masterTable, name);
        request.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(request);
    }

    public Deferred<ListTabletServersResponse> listTabletServers() {
        this.checkIsClosed();
        ListTabletServersRequest rpc = new ListTabletServersRequest(this.masterTable);
        rpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(rpc);
    }

    Deferred<GetTableSchemaResponse> getTableSchema(String name) {
        GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
        rpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(rpc);
    }

    public Deferred<ListTablesResponse> getTablesList() {
        return this.getTablesList(null);
    }

    public Deferred<ListTablesResponse> getTablesList(String nameFilter) {
        ListTablesRequest rpc = new ListTablesRequest(this.masterTable, nameFilter);
        rpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.sendRpcToTablet(rpc);
    }

    public Deferred<Boolean> tableExists(final String name) {
        if (name == null) {
            throw new IllegalArgumentException("The table name cannot be null");
        }
        return this.getTablesList().addCallbackDeferring((Callback)new Callback<Deferred<Boolean>, ListTablesResponse>(){

            public Deferred<Boolean> call(ListTablesResponse listTablesResponse) throws Exception {
                for (String tableName : listTablesResponse.getTablesList()) {
                    if (!name.equals(tableName)) continue;
                    return Deferred.fromResult((Object)true);
                }
                return Deferred.fromResult((Object)false);
            }
        });
    }

    public Deferred<KuduTable> openTable(final String name) {
        this.checkIsClosed();
        final KuduRpc<KuduTable> fakeRpc = new KuduRpc<KuduTable>(null){

            @Override
            ChannelBuffer serialize(Message header) {
                return null;
            }

            @Override
            String serviceName() {
                return null;
            }

            @Override
            String method() {
                return "IsCreateTableDone";
            }

            @Override
            Pair<KuduTable, Object> deserialize(CallResponse callResponse, String tsUUID) throws KuduException {
                return null;
            }
        };
        fakeRpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        return this.getTableSchema(name).addCallbackDeferring((Callback)new Callback<Deferred<KuduTable>, GetTableSchemaResponse>(){

            public Deferred<KuduTable> call(GetTableSchemaResponse response) throws Exception {
                KuduTable table = new KuduTable(AsyncKuduClient.this, name, response.getTableId(), response.getSchema(), response.getPartitionSchema());
                Deferred d = fakeRpc.getDeferred();
                if (response.isCreateTableDone()) {
                    LOG.debug("Opened table {}", (Object)name);
                    fakeRpc.callback(table);
                } else {
                    LOG.debug("Delaying opening table {}, its tablets aren't fully created", (Object)name);
                    fakeRpc.attempt = (byte)(fakeRpc.attempt + 1);
                    AsyncKuduClient.this.delayedIsCreateTableDone(table, fakeRpc, AsyncKuduClient.this.getOpenTableCB(fakeRpc, table), AsyncKuduClient.this.getDelayedIsCreateTableDoneErrback(fakeRpc));
                }
                return d;
            }
        });
    }

    Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(final KuduRpc<KuduTable> rpc, final KuduTable table) {
        return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>(){

            public Deferred<KuduTable> call(Master.IsCreateTableDoneResponsePB isCreateTableDoneResponsePB) throws Exception {
                String tableName = table.getName();
                Deferred d = rpc.getDeferred();
                if (isCreateTableDoneResponsePB.getDone()) {
                    LOG.debug("Table {}'s tablets are now created", (Object)tableName);
                    rpc.callback(table);
                } else {
                    rpc.attempt = (byte)(rpc.attempt + 1);
                    LOG.debug("Table {}'s tablets are still not created, further delaying opening it", (Object)tableName);
                    AsyncKuduClient.this.delayedIsCreateTableDone(table, rpc, AsyncKuduClient.this.getOpenTableCB(rpc, table), AsyncKuduClient.this.getDelayedIsCreateTableDoneErrback(rpc));
                }
                return d;
            }
        };
    }

    public long getDefaultOperationTimeoutMs() {
        return this.defaultOperationTimeoutMs;
    }

    public long getDefaultAdminOperationTimeoutMs() {
        return this.defaultAdminOperationTimeoutMs;
    }

    public long getDefaultSocketReadTimeoutMs() {
        return this.defaultSocketReadTimeoutMs;
    }

    public boolean isStatisticsEnabled() {
        return !this.statisticsDisabled;
    }

    public Statistics getStatistics() {
        if (this.statisticsDisabled) {
            throw new IllegalStateException("This client's statistics is disabled");
        }
        return this.statistics;
    }

    RequestTracker getRequestTracker() {
        return this.requestTracker;
    }

    HashedWheelTimer getTimer() {
        return this.timer;
    }

    ClientSocketChannelFactory getChannelFactory() {
        return this.channelFactory;
    }

    public AsyncKuduScanner.AsyncKuduScannerBuilder newScannerBuilder(KuduTable table) {
        this.checkIsClosed();
        return new AsyncKuduScanner.AsyncKuduScannerBuilder(this, table);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AsyncKuduSession newSession() {
        this.checkIsClosed();
        AsyncKuduSession session = new AsyncKuduSession(this);
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            this.sessions.add(session);
        }
        return session;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeSession(AsyncKuduSession session) {
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            boolean removed = this.sessions.remove(session);
            assert (removed);
        }
    }

    Deferred<AsyncKuduScanner.Response> scanNextRows(AsyncKuduScanner scanner) {
        RemoteTablet tablet = scanner.currentTablet();
        assert (tablet != null);
        KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
        String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
        TabletClient client = this.connectionCache.getClient(uuid);
        Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
        nextRequest.attempt = (byte)(nextRequest.attempt + 1);
        if (client == null || !client.isAlive()) {
            Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid + " will retry after a delay");
            return this.delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
        }
        client.sendRpc(nextRequest);
        return d;
    }

    Deferred<AsyncKuduScanner.Response> closeScanner(AsyncKuduScanner scanner) {
        RemoteTablet tablet = scanner.currentTablet();
        if (tablet == null) {
            return Deferred.fromResult(null);
        }
        KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
        TabletClient client = this.connectionCache.getClient(tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
        if (client == null || !client.isAlive()) {
            LOG.warn("Cannot close {} properly, no connection open for {}", (Object)scanner, (Object)tablet);
            return Deferred.fromResult(null);
        }
        Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
        closeRequest.attempt = (byte)(closeRequest.attempt + 1);
        client.sendRpc(closeRequest);
        return d;
    }

    <R> Deferred<R> sendRpcToTablet(KuduRpc<R> request) {
        RemoteTablet tablet;
        String uuid;
        byte[] partitionKey;
        if (AsyncKuduClient.cannotRetryRequest(request)) {
            return AsyncKuduClient.tooManyAttemptsOrTimeout(request, null);
        }
        request.attempt = (byte)(request.attempt + 1);
        String tableId = request.getTable().getTableId();
        TableLocationsCache.Entry entry = this.getTableLocationEntry(tableId, partitionKey = request.partitionKey());
        if (entry != null && entry.isNonCoveredRange()) {
            NonCoveredRangeException e = new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), entry.getUpperBoundPartitionKey());
            request.errback(e);
            return Deferred.fromError((Exception)e);
        }
        long lastPropagatedTs = this.getLastPropagatedTimestamp();
        if (request.getExternalConsistencyMode() == ExternalConsistencyMode.CLIENT_PROPAGATED && lastPropagatedTs != -1L) {
            request.setPropagatedTimestamp(lastPropagatedTs);
        }
        if (entry != null && (uuid = (tablet = entry.getTablet()).getReplicaSelectedUUID(request.getReplicaSelection())) != null) {
            Deferred<R> d = request.getDeferred();
            request.setTablet(tablet);
            TabletClient client = this.connectionCache.getLiveClient(uuid);
            if (client != null) {
                client.sendRpc(request);
                return d;
            }
        }
        request.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(request.method(), RpcTraceFrame.Action.QUERY_MASTER).build());
        if (this.tablesNotServed.contains(tableId)) {
            return this.delayedIsCreateTableDone(request.getTable(), request, new RetryRpcCB(request), this.getDelayedIsCreateTableDoneErrback(request));
        }
        RetryRpcCB cb = new RetryRpcCB(request);
        RetryRpcErrback<R> eb = new RetryRpcErrback<R>(request);
        Deferred<Master.GetTableLocationsResponsePB> returnedD = this.locateTablet(request.getTable(), partitionKey, request);
        return AsyncUtil.addCallbacksDeferring(returnedD, cb, eb);
    }

    <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
        return new Callback<Exception, Exception>(){

            public Exception call(Exception e) throws Exception {
                request.errback(e);
                return e;
            }
        };
    }

    <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc, final Callback<Deferred<R>, Master.IsCreateTableDoneResponsePB> retryCB, final Callback<Exception, Exception> errback) {
        long sleepTime = this.getSleepTimeForRpc(rpc);
        if (rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
            return AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, null);
        }
        final class RetryTimer
        implements TimerTask {
            RetryTimer() {
            }

            @Override
            public void run(Timeout timeout) {
                String tableId = table.getTableId();
                boolean has_permit = AsyncKuduClient.this.acquireMasterLookupPermit();
                if (!has_permit && !AsyncKuduClient.this.tablesNotServed.contains(tableId)) {
                    try {
                        retryCB.call(null);
                        return;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                IsCreateTableDoneRequest isCreateTableDoneRequest = new IsCreateTableDoneRequest(AsyncKuduClient.this.masterTable, tableId);
                isCreateTableDoneRequest.setTimeoutMillis(AsyncKuduClient.this.defaultAdminOperationTimeoutMs);
                isCreateTableDoneRequest.setParentRpc(rpc);
                Deferred d = AsyncKuduClient.this.sendRpcToTablet(isCreateTableDoneRequest).addCallback((Callback)new IsCreateTableDoneCB(tableId));
                if (has_permit) {
                    d.addCallbacks(new ReleaseMasterLookupPermit(), new ReleaseMasterLookupPermit());
                }
                d.addCallbacks(retryCB, errback);
            }
        }
        this.newTimeout(new RetryTimer(), sleepTime);
        return rpc.getDeferred();
    }

    boolean isTableNotServed(String tableId) {
        return this.tablesNotServed.contains(tableId);
    }

    long getSleepTimeForRpc(KuduRpc<?> rpc) {
        byte attemptCount = rpc.attempt;
        assert (attemptCount > 0);
        if (attemptCount == 0) {
            LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc, (Throwable)new Exception("Exception created to collect stack trace"));
            attemptCount = 1;
        }
        long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) * this.sleepRandomizer.nextDouble());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
        }
        return sleepTime;
    }

    @VisibleForTesting
    List<TabletClient> getTabletClients() {
        return this.connectionCache.getImmutableTabletClientsList();
    }

    @VisibleForTesting
    TabletClient getTabletClient(String uuid) {
        return this.connectionCache.getClient(uuid);
    }

    @VisibleForTesting
    void emptyTabletsCacheForTable(String tableId) {
        this.tableLocations.remove(tableId);
    }

    static boolean cannotRetryRequest(KuduRpc<?> rpc) {
        return rpc.deadlineTracker.timedOut() || (long)rpc.attempt > 100L;
    }

    static <R> Deferred<R> tooManyAttemptsOrTimeout(KuduRpc<R> request, KuduException cause) {
        String message = (long)request.attempt > 100L ? "Too many attempts: " : "RPC can not complete before timeout: ";
        Status statusTimedOut = Status.TimedOut(message + request);
        NonRecoverableException e = new NonRecoverableException(statusTimedOut, (Throwable)cause);
        LOG.debug("Cannot continue with this RPC: {} because of: {}", new Object[]{request, message, e});
        request.errback(e);
        return Deferred.fromError((Exception)e);
    }

    private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable table, byte[] partitionKey, KuduRpc<?> parentRpc) {
        Deferred<Master.GetTableLocationsResponsePB> d;
        TableLocationsCache.Entry entry;
        boolean hasPermit = this.acquireMasterLookupPermit();
        String tableId = table.getTableId();
        if (!hasPermit && (entry = this.getTableLocationEntry(tableId, partitionKey)) != null && !entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() != null) {
            return Deferred.fromResult(null);
        }
        if (this.isMasterTable(tableId)) {
            d = this.getMasterTableLocationsPB(parentRpc);
        } else {
            GetTableLocationsRequest rpc = new GetTableLocationsRequest(this.masterTable, partitionKey, null, tableId);
            if (parentRpc != null) {
                rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
                rpc.setParentRpc(parentRpc);
            } else {
                rpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
            }
            d = this.sendRpcToTablet(rpc);
        }
        d.addCallback((Callback)new MasterLookupCB(table, partitionKey));
        if (hasPermit) {
            d.addBoth(new ReleaseMasterLookupPermit());
        }
        return d;
    }

    Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
        Deferred responseD = new Deferred();
        GetMasterRegistrationReceived received = new GetMasterRegistrationReceived(this.masterAddresses, (Deferred<Master.GetTableLocationsResponsePB>)responseD);
        for (HostAndPort hostAndPort : this.masterAddresses) {
            Deferred d;
            TabletClient clientForHostAndPort = this.newMasterClient(hostAndPort);
            if (clientForHostAndPort == null) {
                String message = "Couldn't resolve this master's address " + hostAndPort.toString();
                LOG.warn(message);
                Status statusIOE = Status.IOError(message);
                d = Deferred.fromError((Exception)new NonRecoverableException(statusIOE));
            } else {
                d = this.getMasterRegistration(clientForHostAndPort, parentRpc);
            }
            d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
        }
        return responseD;
    }

    List<LocatedTablet> syncLocateTable(KuduTable table, byte[] startPartitionKey, byte[] endPartitionKey, long deadline) throws Exception {
        return (List)this.locateTable(table, startPartitionKey, endPartitionKey, deadline).join();
    }

    private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table, byte[] startPartitionKey, final byte[] endPartitionKey, final List<LocatedTablet> ret, final DeadlineTracker deadlineTracker) {
        Preconditions.checkArgument(startPartitionKey == null || startPartitionKey.length > 0, "use null for unbounded start partition key");
        Preconditions.checkArgument(endPartitionKey == null || endPartitionKey.length > 0, "use null for unbounded end partition key");
        byte[] partitionKey = startPartitionKey;
        String tableId = table.getTableId();
        while (partitionKey == null || partitionKey.length > 0 && (endPartitionKey == null || Bytes.memcmp(partitionKey, endPartitionKey) < 0)) {
            byte[] key = partitionKey == null ? EMPTY_ARRAY : partitionKey;
            TableLocationsCache.Entry entry = this.getTableLocationEntry(tableId, key);
            if (entry != null) {
                if (!entry.isNonCoveredRange()) {
                    ret.add(new LocatedTablet(entry.getTablet()));
                }
                partitionKey = entry.getUpperBoundPartitionKey();
                continue;
            }
            if (deadlineTracker.timedOut()) {
                Status statusTimedOut = Status.TimedOut("Took too long getting the list of tablets, " + deadlineTracker);
                return Deferred.fromError((Exception)new NonRecoverableException(statusTimedOut));
            }
            final byte[] lookupKey = partitionKey;
            return this.locateTablet(table, key, null).addCallbackDeferring((Callback)new Callback<Deferred<List<LocatedTablet>>, Master.GetTableLocationsResponsePB>(){

                public Deferred<List<LocatedTablet>> call(Master.GetTableLocationsResponsePB resp) {
                    return AsyncKuduClient.this.loopLocateTable(table, lookupKey, endPartitionKey, ret, deadlineTracker);
                }

                public String toString() {
                    return "LoopLocateTableCB";
                }
            });
        }
        return Deferred.fromResult(ret);
    }

    Deferred<List<LocatedTablet>> locateTable(KuduTable table, byte[] startPartitionKey, byte[] endPartitionKey, long deadline) {
        ArrayList<LocatedTablet> ret = Lists.newArrayList();
        DeadlineTracker deadlineTracker = new DeadlineTracker();
        deadlineTracker.setDeadline(deadline);
        return this.loopLocateTable(table, startPartitionKey, endPartitionKey, ret, deadlineTracker);
    }

    <R> void handleTabletNotFound(KuduRpc<R> rpc, KuduException ex, TabletClient server) {
        this.invalidateTabletCache(rpc.getTablet(), server);
        this.handleRetryableError(rpc, ex);
    }

    <R> void handleNotLeader(KuduRpc<R> rpc, KuduException ex, TabletClient server) {
        rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
        this.handleRetryableError(rpc, ex);
    }

    <R> void handleRetryableError(KuduRpc<R> rpc, KuduException ex) {
        this.delayedSendRpcToTablet(rpc, ex);
    }

    private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
        assert (ex != null);
        Status reasonForRetry = ex.getStatus();
        rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(rpc.method(), RpcTraceFrame.Action.SLEEP_THEN_RETRY).callStatus(reasonForRetry).build());
        long sleepTime = this.getSleepTimeForRpc(rpc);
        if (AsyncKuduClient.cannotRetryRequest(rpc) || rpc.deadlineTracker.wouldSleepingTimeout(sleepTime)) {
            return AsyncKuduClient.tooManyAttemptsOrTimeout(rpc, ex);
        }
        final class RetryTimer
        implements TimerTask {
            RetryTimer() {
            }

            @Override
            public void run(Timeout timeout) {
                AsyncKuduClient.this.sendRpcToTablet(rpc);
            }
        }
        this.newTimeout(new RetryTimer(), sleepTime);
        return rpc.getDeferred();
    }

    private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
        String uuid = server.getServerInfo().getUuid();
        LOG.info("Removing server {} from this tablet's cache {}", (Object)uuid, (Object)tablet.getTabletId());
        tablet.removeTabletClient(uuid);
    }

    boolean acquireMasterLookupPermit() {
        try {
            return this.masterLookups.tryAcquire(5L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    void releaseMasterLookupPermit() {
        this.masterLookups.release();
    }

    @VisibleForTesting
    void discoverTablets(KuduTable table, byte[] requestPartitionKey, List<Master.TabletLocationsPB> locations, long ttl) throws KuduException {
        TableLocationsCache existingLocationsCache;
        String tableId = table.getTableId();
        String tableName = table.getName();
        TableLocationsCache locationsCache = this.tableLocations.get(tableId);
        if (locationsCache == null && (existingLocationsCache = this.tableLocations.putIfAbsent(tableId, locationsCache = new TableLocationsCache())) != null) {
            locationsCache = existingLocationsCache;
        }
        ArrayList<RemoteTablet> tablets = new ArrayList<RemoteTablet>(locations.size());
        for (Master.TabletLocationsPB tabletPb : locations) {
            ArrayList<UnknownHostException> lookupExceptions = new ArrayList<UnknownHostException>(tabletPb.getReplicasCount());
            ArrayList<ServerInfo> servers = new ArrayList<ServerInfo>(tabletPb.getReplicasCount());
            for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
                try {
                    ServerInfo serverInfo = this.connectionCache.connectTS(replica.getTsInfo());
                    if (serverInfo == null) continue;
                    servers.add(serverInfo);
                }
                catch (UnknownHostException ex) {
                    lookupExceptions.add(ex);
                }
            }
            if (!lookupExceptions.isEmpty() && lookupExceptions.size() == tabletPb.getReplicasCount()) {
                Status statusIOE = Status.IOError("Couldn't find any valid locations, exceptions: " + lookupExceptions);
                throw new NonRecoverableException(statusIOE);
            }
            RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers);
            LOG.info("Learned about tablet {} for table '{}' with partition {}", new Object[]{rt.getTabletId(), tableName, rt.getPartition()});
            tablets.add(rt);
        }
        locationsCache.cacheTabletLocations(tablets, requestPartitionKey, ttl);
        TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
        if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
            throw new NoLeaderFoundException(Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
        }
    }

    TableLocationsCache.Entry getTableLocationEntry(String tableId, byte[] partitionKey) {
        TableLocationsCache cache = this.tableLocations.get(tableId);
        if (cache == null) {
            return null;
        }
        return cache.get(partitionKey);
    }

    Deferred<LocatedTablet> getTabletLocation(final KuduTable table, final byte[] partitionKey, long deadline) {
        Deferred<List<LocatedTablet>> locatedTablets = partitionKey.length == 0 ? this.locateTable(table, null, new byte[]{0}, deadline) : this.locateTable(table, partitionKey, Arrays.copyOf(partitionKey, partitionKey.length + 1), deadline);
        return locatedTablets.addCallbackDeferring((Callback)new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>(){

            public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
                Preconditions.checkArgument(tablets.size() <= 1, "found more than one tablet for a single partition key");
                if (tablets.size() == 0) {
                    TableLocationsCache.Entry entry = AsyncKuduClient.this.getTableLocationEntry(table.getTableId(), partitionKey);
                    if (entry == null) {
                        LOG.debug("Table location expired before it could be processed; retrying.");
                        return Deferred.fromError((Exception)new RecoverableException(Status.NotFound("Table location expired before it could be processed")));
                    }
                    if (entry.isNonCoveredRange()) {
                        return Deferred.fromError((Exception)new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), entry.getUpperBoundPartitionKey()));
                    }
                    return Deferred.fromResult((Object)new LocatedTablet(entry.getTablet()));
                }
                return Deferred.fromResult((Object)tablets.get(0));
            }
        });
    }

    Deferred<GetMasterRegistrationResponse> getMasterRegistration(TabletClient masterClient, KuduRpc<?> parentRpc) {
        GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(this.masterTable);
        if (parentRpc != null) {
            rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
            rpc.setParentRpc(parentRpc);
        } else {
            rpc.setTimeoutMillis(this.defaultAdminOperationTimeoutMs);
        }
        Deferred d = rpc.getDeferred();
        rpc.attempt = (byte)(rpc.attempt + 1);
        masterClient.sendRpc(rpc);
        return d;
    }

    TabletClient newMasterClient(HostAndPort masterHostPort) {
        InetAddress inetAddress = NetUtil.getInetAddress(masterHostPort.getHostText());
        if (inetAddress == null) {
            return null;
        }
        return this.connectionCache.newClient("Kudu Master - " + masterHostPort.toString(), inetAddress, masterHostPort.getPort());
    }

    @Override
    public void close() throws Exception {
        this.shutdown().join();
    }

    public Deferred<ArrayList<Void>> shutdown() {
        this.checkIsClosed();
        this.closed = true;
        final class DisconnectCB
        implements Callback<Deferred<ArrayList<Void>>, ArrayList<List<OperationResponse>>> {
            DisconnectCB() {
            }

            public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> ignoredResponses) {
                final class ReleaseResourcesCB
                implements Callback<ArrayList<Void>, ArrayList<Void>> {
                    ReleaseResourcesCB() {
                    }

                    public ArrayList<Void> call(ArrayList<Void> arg) {
                        LOG.debug("Releasing all remaining resources");
                        AsyncKuduClient.this.timer.stop();
                        final class ShutdownThread
                        extends Thread {
                            ShutdownThread() {
                                super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
                            }

                            @Override
                            public void run() {
                                AsyncKuduClient.this.channelFactory.releaseExternalResources();
                            }
                        }
                        new ShutdownThread().start();
                        return arg;
                    }

                    public String toString() {
                        return "release resources callback";
                    }
                }
                return AsyncKuduClient.this.connectionCache.disconnectEverything().addCallback((Callback)new ReleaseResourcesCB());
            }

            public String toString() {
                return "disconnect callback";
            }
        }
        return this.closeAllSessions().addCallbackDeferring((Callback)new DisconnectCB());
    }

    private void checkIsClosed() {
        if (this.closed) {
            throw new IllegalStateException("Cannot proceed, the client has already been closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Deferred<ArrayList<List<OperationResponse>>> closeAllSessions() {
        HashSet<AsyncKuduSession> copyOfSessions;
        Set<AsyncKuduSession> set = this.sessions;
        synchronized (set) {
            copyOfSessions = new HashSet<AsyncKuduSession>(this.sessions);
        }
        if (this.sessions.isEmpty()) {
            return Deferred.fromResult(null);
        }
        ArrayList<Deferred<List<OperationResponse>>> deferreds = new ArrayList<Deferred<List<OperationResponse>>>(copyOfSessions.size());
        for (AsyncKuduSession session : copyOfSessions) {
            deferreds.add(session.close());
        }
        return Deferred.group(deferreds);
    }

    private boolean isMasterTable(String tableId) {
        return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
    }

    void newTimeout(TimerTask task, long timeout_ms) {
        try {
            this.timer.newTimeout(task, timeout_ms, TimeUnit.MILLISECONDS);
        }
        catch (IllegalStateException e) {
            LOG.warn("Failed to schedule timer.  Ignore this if we're shutting down.", (Throwable)e);
        }
    }

    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public static final class AsyncKuduClientBuilder {
        private static final int DEFAULT_MASTER_PORT = 7051;
        private static final int DEFAULT_BOSS_COUNT = 1;
        private static final int DEFAULT_WORKER_COUNT = 2 * Runtime.getRuntime().availableProcessors();
        private final List<HostAndPort> masterAddresses;
        private long defaultAdminOperationTimeoutMs = 30000L;
        private long defaultOperationTimeoutMs = 30000L;
        private long defaultSocketReadTimeoutMs = 10000L;
        private final HashedWheelTimer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20L, TimeUnit.MILLISECONDS);
        private Executor bossExecutor;
        private Executor workerExecutor;
        private int bossCount = 1;
        private int workerCount = DEFAULT_WORKER_COUNT;
        private boolean statisticsDisabled = false;

        public AsyncKuduClientBuilder(String masterAddresses) {
            this.masterAddresses = NetUtil.parseStrings(masterAddresses, 7051);
        }

        public AsyncKuduClientBuilder(List<String> masterAddresses) {
            this.masterAddresses = Lists.newArrayListWithCapacity(masterAddresses.size());
            for (String address : masterAddresses) {
                this.masterAddresses.add(NetUtil.parseString(address, 7051));
            }
        }

        public AsyncKuduClientBuilder defaultAdminOperationTimeoutMs(long timeoutMs) {
            this.defaultAdminOperationTimeoutMs = timeoutMs;
            return this;
        }

        public AsyncKuduClientBuilder defaultOperationTimeoutMs(long timeoutMs) {
            this.defaultOperationTimeoutMs = timeoutMs;
            return this;
        }

        public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
            this.defaultSocketReadTimeoutMs = timeoutMs;
            return this;
        }

        public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor workerExecutor) {
            this.bossExecutor = bossExecutor;
            this.workerExecutor = workerExecutor;
            return this;
        }

        public AsyncKuduClientBuilder bossCount(int bossCount) {
            Preconditions.checkArgument(bossCount > 0, "bossCount should be greater than 0");
            this.bossCount = bossCount;
            return this;
        }

        public AsyncKuduClientBuilder workerCount(int workerCount) {
            Preconditions.checkArgument(workerCount > 0, "workerCount should be greater than 0");
            this.workerCount = workerCount;
            return this;
        }

        private NioClientSocketChannelFactory createChannelFactory() {
            Executor boss = this.bossExecutor;
            Executor worker = this.workerExecutor;
            if (boss == null || worker == null) {
                ExecutorService defaultExec = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kudu-nio-%d").setDaemon(true).build());
                if (boss == null) {
                    boss = defaultExec;
                }
                if (worker == null) {
                    worker = defaultExec;
                }
            }
            return new NioClientSocketChannelFactory(boss, this.bossCount, new NioWorkerPool(worker, this.workerCount), this.timer);
        }

        public AsyncKuduClientBuilder disableStatistics() {
            this.statisticsDisabled = true;
            return this;
        }

        public AsyncKuduClient build() {
            return new AsyncKuduClient(this);
        }
    }

    private final class MasterLookupCB
    implements Callback<Object, Master.GetTableLocationsResponsePB> {
        final KuduTable table;
        private final byte[] partitionKey;

        MasterLookupCB(KuduTable table, byte[] partitionKey) {
            this.table = table;
            this.partitionKey = partitionKey;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public Object call(Master.GetTableLocationsResponsePB response) {
            if (response.hasError()) {
                if (response.getError().getCode() == Master.MasterErrorPB.Code.TABLET_NOT_RUNNING) {
                    LOG.debug("Table {} has a non-running tablet", (Object)this.table.getName());
                    AsyncKuduClient.this.tablesNotServed.add(this.table.getTableId());
                    return null;
                }
                Status status = Status.fromMasterErrorPB(response.getError());
                return new NonRecoverableException(status);
            }
            try {
                AsyncKuduClient.this.discoverTablets(this.table, this.partitionKey, response.getTabletLocationsList(), response.getTtlMillis());
                return null;
            }
            catch (KuduException e) {
                return e;
            }
        }

        public String toString() {
            return "get tablet locations from the master for table " + this.table.getName();
        }
    }

    private final class IsCreateTableDoneCB
    implements Callback<Master.IsCreateTableDoneResponsePB, Master.IsCreateTableDoneResponsePB> {
        final String tableName;

        IsCreateTableDoneCB(String tableName) {
            this.tableName = tableName;
        }

        public Master.IsCreateTableDoneResponsePB call(Master.IsCreateTableDoneResponsePB response) {
            if (response.getDone()) {
                LOG.debug("Table {} was created", (Object)this.tableName);
                AsyncKuduClient.this.tablesNotServed.remove(this.tableName);
            } else {
                LOG.debug("Table {} is still being created", (Object)this.tableName);
            }
            return response;
        }

        public String toString() {
            return "ask the master if " + this.tableName + " was created";
        }
    }

    private final class ReleaseMasterLookupPermit<T>
    implements Callback<T, T> {
        private ReleaseMasterLookupPermit() {
        }

        public T call(T arg) {
            AsyncKuduClient.this.releaseMasterLookupPermit();
            return arg;
        }

        public String toString() {
            return "release master lookup permit";
        }
    }

    final class RetryRpcErrback<R>
    implements Callback<Deferred<R>, Exception> {
        private final KuduRpc<R> request;

        public RetryRpcErrback(KuduRpc<R> request) {
            this.request = request;
        }

        public Deferred<R> call(Exception arg) {
            if (arg instanceof RecoverableException) {
                return AsyncKuduClient.this.delayedSendRpcToTablet(this.request, (KuduException)arg);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Notify RPC %s after lookup exception", this.request), (Throwable)arg);
            }
            this.request.errback(arg);
            return Deferred.fromError((Exception)arg);
        }

        public String toString() {
            return "retry RPC after error";
        }
    }

    final class RetryRpcCB<R, D>
    implements Callback<Deferred<R>, D> {
        private final KuduRpc<R> request;

        RetryRpcCB(KuduRpc<R> request) {
            this.request = request;
        }

        public Deferred<R> call(D arg) {
            LOG.debug("Retrying sending RPC {} after lookup", this.request);
            return AsyncKuduClient.this.sendRpcToTablet(this.request);
        }

        public String toString() {
            return "retry RPC";
        }
    }
}

