/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
import org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightBindingService;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.auth.BasicClientAuthHandler;
import org.apache.arrow.flight.auth.ClientAuthHandler;
import org.apache.arrow.flight.auth.ClientAuthInterceptor;
import org.apache.arrow.flight.auth.ClientAuthWrapper;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

public class FlightClient
implements AutoCloseable {
    private static final int PENDING_REQUESTS = 5;
    private static final int MAX_CHANNEL_TRACE_EVENTS = 0;
    private final BufferAllocator allocator;
    private final ManagedChannel channel;
    private final FlightServiceGrpc.FlightServiceBlockingStub blockingStub;
    private final FlightServiceGrpc.FlightServiceStub asyncStub;
    private final ClientAuthInterceptor authInterceptor = new ClientAuthInterceptor();
    private final MethodDescriptor<Flight.Ticket, ArrowMessage> doGetDescriptor;
    private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor;

    public FlightClient(BufferAllocator incomingAllocator, Location location) {
        ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress((String)location.getHost(), (int)location.getPort()).maxTraceEvents(0).maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext();
        this.allocator = incomingAllocator.newChildAllocator("flight-client", 0L, Long.MAX_VALUE);
        this.channel = channelBuilder.build();
        this.blockingStub = (FlightServiceGrpc.FlightServiceBlockingStub)FlightServiceGrpc.newBlockingStub((Channel)this.channel).withInterceptors(new ClientInterceptor[]{this.authInterceptor});
        this.asyncStub = (FlightServiceGrpc.FlightServiceStub)FlightServiceGrpc.newStub((Channel)this.channel).withInterceptors(new ClientInterceptor[]{this.authInterceptor});
        this.doGetDescriptor = FlightBindingService.getDoGetDescriptor(this.allocator);
        this.doPutDescriptor = FlightBindingService.getDoPutDescriptor(this.allocator);
    }

    public Iterable<FlightInfo> listFlights(Criteria criteria) {
        return ImmutableList.copyOf(this.blockingStub.listFlights(criteria.asCriteria())).stream().map(t -> new FlightInfo((Flight.FlightGetInfo)t)).collect(Collectors.toList());
    }

    public Iterable<ActionType> listActions() {
        return ImmutableList.copyOf(this.blockingStub.listActions(Flight.Empty.getDefaultInstance())).stream().map(t -> new ActionType((Flight.ActionType)t)).collect(Collectors.toList());
    }

    public Iterator<Result> doAction(Action action) {
        return Iterators.transform(this.blockingStub.doAction(action.toProtocol()), t -> new Result((Flight.Result)t));
    }

    public void authenticateBasic(String username, String password) {
        BasicClientAuthHandler basicClient = new BasicClientAuthHandler(username, password);
        this.authenticate(basicClient);
    }

    public void authenticate(ClientAuthHandler handler) {
        Preconditions.checkArgument((!this.authInterceptor.hasToken() ? 1 : 0) != 0, (Object)"Auth already completed.");
        this.authInterceptor.setToken(ClientAuthWrapper.doClientAuth(handler, this.asyncStub));
    }

    public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRoot root) {
        Preconditions.checkNotNull((Object)descriptor);
        Preconditions.checkNotNull((Object)root);
        SetStreamObserver resultObserver = new SetStreamObserver();
        ClientCallStreamObserver observer = (ClientCallStreamObserver)ClientCalls.asyncClientStreamingCall((ClientCall)this.channel.newCall(this.doPutDescriptor, this.asyncStub.getCallOptions()), resultObserver);
        ArrowMessage message = new ArrowMessage(descriptor.toProtocol(), root.getSchema());
        observer.onNext((Object)message);
        return new PutObserver(new VectorUnloader(root, true, true), (ClientCallStreamObserver<ArrowMessage>)observer, resultObserver.getFuture());
    }

    public FlightInfo getInfo(FlightDescriptor descriptor) {
        return new FlightInfo(this.blockingStub.getFlightInfo(descriptor.toProtocol()));
    }

    public FlightStream getStream(Ticket ticket) {
        ClientCall call = this.channel.newCall(this.doGetDescriptor, this.asyncStub.getCallOptions());
        FlightStream stream = new FlightStream(this.allocator, 5, (message, cause) -> call.cancel(message, cause), count -> call.request(count));
        final StreamObserver<ArrowMessage> delegate = stream.asObserver();
        ClientResponseObserver<Flight.Ticket, ArrowMessage> clientResponseObserver = new ClientResponseObserver<Flight.Ticket, ArrowMessage>(){

            public void beforeStart(ClientCallStreamObserver<Flight.Ticket> requestStream) {
                requestStream.disableAutoInboundFlowControl();
            }

            public void onNext(ArrowMessage value) {
                delegate.onNext((Object)value);
            }

            public void onError(Throwable t) {
                delegate.onError(t);
            }

            public void onCompleted() {
                delegate.onCompleted();
            }
        };
        ClientCalls.asyncServerStreamingCall((ClientCall)call, (Object)ticket.toProtocol(), (StreamObserver)clientResponseObserver);
        return stream;
    }

    @Override
    public void close() throws InterruptedException {
        this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        this.allocator.close();
    }

    public static interface ClientStreamListener {
        public void putNext();

        public void error(Throwable var1);

        public void completed();

        public Flight.PutResult getResult();
    }

    private static class PutObserver
    implements ClientStreamListener {
        private final ClientCallStreamObserver<ArrowMessage> observer;
        private final VectorUnloader unloader;
        private final ListenableFuture<Flight.PutResult> futureResult;

        public PutObserver(VectorUnloader unloader, ClientCallStreamObserver<ArrowMessage> observer, ListenableFuture<Flight.PutResult> futureResult) {
            this.observer = observer;
            this.unloader = unloader;
            this.futureResult = futureResult;
        }

        @Override
        public void putNext() {
            ArrowRecordBatch batch = this.unloader.getRecordBatch();
            while (!this.observer.isReady() && !this.futureResult.isDone()) {
            }
            this.observer.onNext((Object)new ArrowMessage(batch));
        }

        @Override
        public void error(Throwable ex) {
            this.observer.onError(ex);
        }

        @Override
        public void completed() {
            this.observer.onCompleted();
        }

        @Override
        public Flight.PutResult getResult() {
            try {
                return (Flight.PutResult)this.futureResult.get();
            }
            catch (Exception ex) {
                throw Throwables.propagate((Throwable)ex);
            }
        }
    }

    private static class SetStreamObserver<T>
    implements StreamObserver<T> {
        private final SettableFuture<T> result = SettableFuture.create();
        private volatile T resultLocal;

        private SetStreamObserver() {
        }

        public void onNext(T value) {
            this.resultLocal = value;
        }

        public void onError(Throwable t) {
            this.result.setException(t);
        }

        public void onCompleted() {
            this.result.set(Preconditions.checkNotNull(this.resultLocal));
        }

        public ListenableFuture<T> getFuture() {
            return this.result;
        }
    }
}

