/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CallResponse;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduRpc;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.NonCoveredRangeException;
import org.apache.kudu.client.NonRecoverableException;
import org.apache.kudu.client.Partition;
import org.apache.kudu.client.PartitionPruner;
import org.apache.kudu.client.ProtobufHelper;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.Status;
import org.apache.kudu.client.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.client.shaded.com.google.common.collect.ImmutableList;
import org.apache.kudu.client.shaded.com.google.protobuf.Message;
import org.apache.kudu.client.shaded.com.google.protobuf.ZeroCopyLiteralByteString;
import org.apache.kudu.client.shaded.org.jboss.netty.buffer.ChannelBuffer;
import org.apache.kudu.tserver.Tserver;
import org.apache.kudu.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class AsyncKuduScanner {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncKuduScanner.class);
    private final AsyncKuduClient client;
    private final KuduTable table;
    private final Schema schema;
    private final PartitionPruner pruner;
    private final Map<String, KuduPredicate> predicates;
    private final int batchSizeBytes;
    private final long limit;
    private final byte[] startPrimaryKey;
    private final byte[] endPrimaryKey;
    private final boolean prefetching;
    private final boolean cacheBlocks;
    private final ReadMode readMode;
    private final Common.OrderMode orderMode;
    private final long htTimestamp;
    private boolean closed = false;
    private boolean hasMore = true;
    private AsyncKuduClient.RemoteTablet tablet;
    private byte[] scannerId;
    private int sequenceId;
    private Deferred<RowResultIterator> prefetcherDeferred;
    private boolean inFirstTablet = true;
    final long scanRequestTimeout;
    private final Callback<RowResultIterator, RowResultIterator> prefetch = new Callback<RowResultIterator, RowResultIterator>(){

        public RowResultIterator call(RowResultIterator arg) throws Exception {
            if (AsyncKuduScanner.this.hasMoreRows()) {
                AsyncKuduScanner.this.prefetcherDeferred = AsyncKuduScanner.this.client.scanNextRows(AsyncKuduScanner.this).addCallbacks(AsyncKuduScanner.this.got_next_row, AsyncKuduScanner.this.nextRowErrback());
            }
            return null;
        }
    };
    private final Callback<RowResultIterator, Response> got_next_row = new Callback<RowResultIterator, Response>(){

        public RowResultIterator call(Response resp) {
            if (!resp.more) {
                AsyncKuduScanner.this.scanFinished();
                return resp.data;
            }
            AsyncKuduScanner.this.sequenceId++;
            AsyncKuduScanner.this.hasMore = resp.more;
            return resp.data;
        }

        public String toString() {
            return "get nextRows response";
        }
    };

    AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames, List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode orderMode, long scanRequestTimeout, Map<String, KuduPredicate> predicates, long limit, boolean cacheBlocks, boolean prefetching, byte[] startPrimaryKey, byte[] endPrimaryKey, long htTimestamp, int batchSizeBytes, PartitionPruner pruner) {
        Preconditions.checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, got %s", batchSizeBytes);
        Preconditions.checkArgument(limit > 0L, "Need a strictly positive number for the limit, got %s", limit);
        if (htTimestamp != -1L) {
            Preconditions.checkArgument(htTimestamp >= 0L, "Need non-negative number for the scan,  timestamp got %s", htTimestamp);
            Preconditions.checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "When specifying a HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
        }
        if (orderMode == Common.OrderMode.ORDERED) {
            Preconditions.checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Returning rows in primary key order requires the read mode to be set to READ_AT_SNAPSHOT");
        }
        this.client = client;
        this.table = table;
        this.pruner = pruner;
        this.readMode = readMode;
        this.orderMode = orderMode;
        this.scanRequestTimeout = scanRequestTimeout;
        this.predicates = predicates;
        this.limit = limit;
        this.cacheBlocks = cacheBlocks;
        this.prefetching = prefetching;
        this.startPrimaryKey = startPrimaryKey;
        this.endPrimaryKey = endPrimaryKey;
        this.htTimestamp = htTimestamp;
        this.batchSizeBytes = batchSizeBytes;
        if (projectedNames != null) {
            ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
            for (String columnName : projectedNames) {
                ColumnSchema originalColumn = table.getSchema().getColumn(columnName);
                columns.add(AsyncKuduScanner.getStrippedColumnSchema(originalColumn));
            }
            this.schema = new Schema(columns);
        } else if (projectedIndexes != null) {
            ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
            for (Integer columnIndex : projectedIndexes) {
                ColumnSchema originalColumn = table.getSchema().getColumnByIndex(columnIndex);
                columns.add(AsyncKuduScanner.getStrippedColumnSchema(originalColumn));
            }
            this.schema = new Schema(columns);
        } else {
            this.schema = table.getSchema();
        }
        if (!pruner.hasMorePartitionKeyRanges()) {
            LOG.debug("Short circuiting scan with predicates: {}", predicates.values());
            this.hasMore = false;
            this.closed = true;
        }
    }

    private static ColumnSchema getStrippedColumnSchema(ColumnSchema columnToClone) {
        return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(), columnToClone.getType()).nullable(columnToClone.isNullable()).build();
    }

    public long getLimit() {
        return this.limit;
    }

    public boolean hasMoreRows() {
        return this.hasMore;
    }

    public boolean getCacheBlocks() {
        return this.cacheBlocks;
    }

    public long getBatchSizeBytes() {
        return this.batchSizeBytes;
    }

    public ReadMode getReadMode() {
        return this.readMode;
    }

    private Common.OrderMode getOrderMode() {
        return this.orderMode;
    }

    public Schema getProjectionSchema() {
        return this.schema;
    }

    long getSnapshotTimestamp() {
        return this.htTimestamp;
    }

    public Deferred<RowResultIterator> nextRows() {
        if (this.closed) {
            return Deferred.fromResult(null);
        }
        if (this.tablet == null) {
            Callback<Deferred<RowResultIterator>, Response> cb = new Callback<Deferred<RowResultIterator>, Response>(){

                public Deferred<RowResultIterator> call(Response resp) throws Exception {
                    if (!resp.more || resp.scanner_id == null) {
                        AsyncKuduScanner.this.scanFinished();
                        return Deferred.fromResult((Object)resp.data);
                    }
                    AsyncKuduScanner.access$302(AsyncKuduScanner.this, resp.scanner_id);
                    AsyncKuduScanner.this.sequenceId++;
                    AsyncKuduScanner.this.hasMore = resp.more;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Scanner " + Bytes.pretty(AsyncKuduScanner.this.scannerId) + " opened on " + AsyncKuduScanner.this.tablet);
                    }
                    return Deferred.fromResult((Object)resp.data);
                }

                public String toString() {
                    return "scanner opened";
                }
            };
            Callback<Deferred<RowResultIterator>, Exception> eb = new Callback<Deferred<RowResultIterator>, Exception>(){

                public Deferred<RowResultIterator> call(Exception e) throws Exception {
                    AsyncKuduScanner.this.invalidate();
                    if (e instanceof NonCoveredRangeException) {
                        NonCoveredRangeException ncre = (NonCoveredRangeException)e;
                        AsyncKuduScanner.this.pruner.removePartitionKeyRange(ncre.getNonCoveredRangeEnd());
                        if (!AsyncKuduScanner.this.pruner.hasMorePartitionKeyRanges()) {
                            AsyncKuduScanner.this.hasMore = false;
                            AsyncKuduScanner.this.closed = true;
                            return Deferred.fromResult((Object)RowResultIterator.empty());
                        }
                        AsyncKuduScanner.access$302(AsyncKuduScanner.this, null);
                        AsyncKuduScanner.this.sequenceId = 0;
                        return AsyncKuduScanner.this.nextRows();
                    }
                    LOG.warn("Can not open scanner", (Throwable)e);
                    return Deferred.fromError((Exception)e);
                }

                public String toString() {
                    return "open scanner errback";
                }
            };
            return this.client.sendRpcToTablet(this.getOpenRequest()).addCallbackDeferring((Callback)cb).addErrback((Callback)eb);
        }
        if (this.prefetching && this.prefetcherDeferred != null) {
            this.prefetcherDeferred.chain(new Deferred().addCallback(this.prefetch));
            return this.prefetcherDeferred;
        }
        Deferred d = this.client.scanNextRows(this).addCallbacks(this.got_next_row, this.nextRowErrback());
        if (this.prefetching) {
            d.chain(new Deferred().addCallback(this.prefetch));
        }
        return d;
    }

    private final Callback<Exception, Exception> nextRowErrback() {
        return new Callback<Exception, Exception>(){

            public Exception call(Exception error) {
                AsyncKuduClient.RemoteTablet old_tablet = AsyncKuduScanner.this.tablet;
                String message = old_tablet + " pretends to not know " + AsyncKuduScanner.this;
                LOG.warn(message, (Throwable)error);
                AsyncKuduScanner.this.invalidate();
                return error;
            }

            public String toString() {
                return "NextRow errback";
            }
        };
    }

    void scanFinished() {
        Partition partition = this.tablet.getPartition();
        this.pruner.removePartitionKeyRange(partition.getPartitionKeyEnd());
        if (!this.pruner.hasMorePartitionKeyRanges()) {
            this.hasMore = false;
            this.closed = true;
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Done scanning tablet {} for partition {} with scanner id {}", new Object[]{this.tablet.getTabletIdAsString(), this.tablet.getPartition(), Bytes.pretty(this.scannerId)});
        }
        this.scannerId = null;
        this.sequenceId = 0;
        this.invalidate();
    }

    public Deferred<RowResultIterator> close() {
        if (this.closed) {
            return Deferred.fromResult(null);
        }
        Deferred d = this.client.closeScanner(this).addCallback(this.closedCallback());
        return d;
    }

    private Callback<RowResultIterator, Response> closedCallback() {
        return new Callback<RowResultIterator, Response>(){

            public RowResultIterator call(Response response) {
                AsyncKuduScanner.this.closed = true;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scanner " + Bytes.pretty(AsyncKuduScanner.this.scannerId) + " closed on " + AsyncKuduScanner.this.tablet);
                }
                AsyncKuduScanner.this.tablet = null;
                AsyncKuduScanner.access$302(AsyncKuduScanner.this, "client debug closed".getBytes());
                return response == null ? null : response.data;
            }

            public String toString() {
                return "scanner closed";
            }
        };
    }

    public String toString() {
        String tablet = this.tablet == null ? "null" : this.tablet.getTabletIdAsString();
        StringBuilder buf = new StringBuilder();
        buf.append("KuduScanner(table=");
        buf.append(this.table.getName());
        buf.append(", tablet=").append(tablet);
        buf.append(", scannerId=").append(Bytes.pretty(this.scannerId));
        buf.append(", scanRequestTimeout=").append(this.scanRequestTimeout);
        buf.append(')');
        return buf.toString();
    }

    KuduTable table() {
        return this.table;
    }

    void setTablet(AsyncKuduClient.RemoteTablet tablet) {
        this.tablet = tablet;
    }

    void invalidate() {
        this.tablet = null;
    }

    AsyncKuduClient.RemoteTablet currentTablet() {
        return this.tablet;
    }

    KuduRpc<Response> getOpenRequest() {
        this.checkScanningNotStarted();
        if (this.inFirstTablet) {
            this.inFirstTablet = false;
        }
        return new ScanRequest(this.table, State.OPENING);
    }

    KuduRpc<Response> getNextRowsRequest() {
        return new ScanRequest(this.table, State.NEXT);
    }

    KuduRpc<Response> getCloseRequest() {
        return new ScanRequest(this.table, State.CLOSING);
    }

    private void checkScanningNotStarted() {
        if (this.tablet != null) {
            throw new IllegalStateException("scanning already started");
        }
    }

    static /* synthetic */ byte[] access$302(AsyncKuduScanner x0, byte[] x1) {
        x0.scannerId = x1;
        return x1;
    }

    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public static class AsyncKuduScannerBuilder
    extends AbstractKuduScannerBuilder<AsyncKuduScannerBuilder, AsyncKuduScanner> {
        AsyncKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
            super(client, table);
        }

        @Override
        public AsyncKuduScanner build() {
            return new AsyncKuduScanner(this.client, this.table, this.projectedColumnNames, this.projectedColumnIndexes, this.readMode, this.orderMode, this.scanRequestTimeout, this.predicates, this.limit, this.cacheBlocks, this.prefetching, this.lowerBoundPrimaryKey, this.upperBoundPrimaryKey, this.htTimestamp, this.batchSizeBytes, PartitionPruner.create(this));
        }
    }

    private final class ScanRequest
    extends KuduRpc<Response> {
        State state;

        ScanRequest(KuduTable table, State state) {
            super(table);
            this.state = state;
            this.setTimeoutMillis(AsyncKuduScanner.this.scanRequestTimeout);
        }

        @Override
        String serviceName() {
            return "kudu.tserver.TabletServerService";
        }

        @Override
        String method() {
            return "Scan";
        }

        @Override
        Collection<Integer> getRequiredFeatures() {
            if (AsyncKuduScanner.this.predicates.isEmpty()) {
                return ImmutableList.of();
            }
            return ImmutableList.of(Integer.valueOf(1));
        }

        @Override
        ChannelBuffer serialize(Message header) {
            Tserver.ScanRequestPB.Builder builder = Tserver.ScanRequestPB.newBuilder();
            switch (this.state) {
                case OPENING: {
                    AsyncKuduScanner.this.tablet = super.getTablet();
                    Tserver.NewScanRequestPB.Builder newBuilder = Tserver.NewScanRequestPB.newBuilder();
                    newBuilder.setLimit(AsyncKuduScanner.this.limit);
                    newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(AsyncKuduScanner.this.schema));
                    newBuilder.setTabletId(ZeroCopyLiteralByteString.wrap(AsyncKuduScanner.this.tablet.getTabletIdAsBytes()));
                    newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
                    newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
                    newBuilder.setCacheBlocks(AsyncKuduScanner.this.cacheBlocks);
                    if (this.table.getAsyncClient().getLastPropagatedTimestamp() != -1L) {
                        newBuilder.setPropagatedTimestamp(this.table.getAsyncClient().getLastPropagatedTimestamp());
                    }
                    newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
                    if (AsyncKuduScanner.this.getReadMode() == ReadMode.READ_AT_SNAPSHOT && AsyncKuduScanner.this.getSnapshotTimestamp() != -1L) {
                        newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
                    }
                    if (AsyncKuduScanner.this.startPrimaryKey.length > 0) {
                        newBuilder.setStartPrimaryKey(ZeroCopyLiteralByteString.copyFrom(AsyncKuduScanner.this.startPrimaryKey));
                    }
                    if (AsyncKuduScanner.this.endPrimaryKey.length > 0) {
                        newBuilder.setStopPrimaryKey(ZeroCopyLiteralByteString.copyFrom(AsyncKuduScanner.this.endPrimaryKey));
                    }
                    for (KuduPredicate pred : AsyncKuduScanner.this.predicates.values()) {
                        newBuilder.addColumnPredicates(pred.toPB());
                    }
                    builder.setNewScanRequest(newBuilder.build()).setBatchSizeBytes(AsyncKuduScanner.this.batchSizeBytes);
                    break;
                }
                case NEXT: {
                    this.setTablet(AsyncKuduScanner.this.tablet);
                    builder.setScannerId(ZeroCopyLiteralByteString.wrap(AsyncKuduScanner.this.scannerId)).setCallSeqId(AsyncKuduScanner.this.sequenceId).setBatchSizeBytes(AsyncKuduScanner.this.batchSizeBytes);
                    break;
                }
                case CLOSING: {
                    this.setTablet(AsyncKuduScanner.this.tablet);
                    builder.setScannerId(ZeroCopyLiteralByteString.wrap(AsyncKuduScanner.this.scannerId)).setBatchSizeBytes(0).setCloseScanner(true);
                }
            }
            Tserver.ScanRequestPB request = builder.build();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending scan req: " + request.toString());
            }
            return ScanRequest.toChannelBuffer(header, request);
        }

        @Override
        Pair<Response, Object> deserialize(CallResponse callResponse, String tsUUID) throws Exception {
            Tserver.TabletServerErrorPB error;
            Tserver.ScanResponsePB.Builder builder = Tserver.ScanResponsePB.newBuilder();
            ScanRequest.readProtobuf(callResponse.getPBMessage(), builder);
            Tserver.ScanResponsePB resp = builder.build();
            byte[] id = resp.getScannerId().toByteArray();
            Tserver.TabletServerErrorPB tabletServerErrorPB = error = resp.hasError() ? resp.getError() : null;
            if (error != null && error.getCode().equals(Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND)) {
                if (this.state == State.OPENING) {
                    return new Pair<Object, Tserver.TabletServerErrorPB>(null, error);
                }
                Status statusIncomplete = Status.Incomplete("Cannot continue scanning, the tablet has moved and this isn't a fault tolerant scan");
                throw new NonRecoverableException(statusIncomplete);
            }
            RowResultIterator iterator = RowResultIterator.makeRowResultIterator(this.deadlineTracker.getElapsedMillis(), tsUUID, AsyncKuduScanner.this.schema, resp.getData(), callResponse);
            boolean hasMore = resp.getHasMoreResults();
            if (id.length != 0 && AsyncKuduScanner.this.scannerId != null && !Bytes.equals(AsyncKuduScanner.this.scannerId, id)) {
                Status statusIllegalState = Status.IllegalState("Scan RPC response was for scanner ID " + Bytes.pretty(id) + " but we expected " + Bytes.pretty(AsyncKuduScanner.this.scannerId));
                throw new NonRecoverableException(statusIllegalState);
            }
            Response response = new Response(id, iterator, hasMore);
            if (LOG.isDebugEnabled()) {
                LOG.debug(response.toString());
            }
            return new Pair<Response, Object>(response, error);
        }

        @Override
        public String toString() {
            return "ScanRequest(scannerId=" + Bytes.pretty(AsyncKuduScanner.this.scannerId) + (AsyncKuduScanner.this.tablet != null ? ", tabletSlice=" + AsyncKuduScanner.this.tablet.getTabletIdAsString() : "") + ", attempt=" + this.attempt + ')';
        }

        @Override
        public byte[] partitionKey() {
            return AsyncKuduScanner.this.pruner.nextPartitionKey();
        }
    }

    private static enum State {
        OPENING,
        NEXT,
        CLOSING;

    }

    static final class Response {
        private final byte[] scanner_id;
        private final RowResultIterator data;
        private final boolean more;

        Response(byte[] scanner_id, RowResultIterator data, boolean more) {
            this.scanner_id = scanner_id;
            this.data = data;
            this.more = more;
        }

        public String toString() {
            return "AsyncKuduScanner$Response(scannerId=" + Bytes.pretty(this.scanner_id) + ", data=" + this.data + ", more=" + this.more + ") ";
        }
    }

    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public static enum ReadMode {
        READ_LATEST(Common.ReadMode.READ_LATEST),
        READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT);

        private Common.ReadMode pbVersion;

        private ReadMode(Common.ReadMode pbVersion) {
            this.pbVersion = pbVersion;
        }

        @InterfaceAudience.Private
        public Common.ReadMode pbVersion() {
            return this.pbVersion;
        }
    }
}

