/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.base.Preconditions;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ArrowMessage;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.StreamPipe;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlightService
extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger(FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors = Executors.newCachedThreadPool();

    public FlightService(BufferAllocator allocator, FlightProducer producer, ServerAuthHandler authHandler) {
        this.allocator = allocator;
        this.producer = producer;
        this.authHandler = authHandler;
    }

    @Override
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> responseObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, responseObserver, this.executors);
    }

    @Override
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightGetInfo> responseObserver) {
        try {
            this.producer.listFlights(new Criteria(criteria), StreamPipe.wrap(responseObserver, t -> t.toProtocol()));
        }
        catch (Exception ex) {
            responseObserver.onError((Throwable)ex);
        }
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserver) {
        try {
            this.producer.getStream(new Ticket(ticket), new GetListener(responseObserver));
        }
        catch (Exception ex) {
            responseObserver.onError((Throwable)ex);
        }
    }

    @Override
    public void doAction(Flight.Action request, StreamObserver<Flight.Result> responseObserver) {
        try {
            responseObserver.onNext((Object)this.producer.doAction(new Action(request)).toProtocol());
            responseObserver.onCompleted();
        }
        catch (Exception ex) {
            responseObserver.onError((Throwable)ex);
        }
    }

    @Override
    public void listActions(Flight.Empty request, StreamObserver<Flight.ActionType> responseObserver) {
        try {
            this.producer.listActions(StreamPipe.wrap(responseObserver, t -> t.toProtocol()));
        }
        catch (Exception ex) {
            responseObserver.onError((Throwable)ex);
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> responseObserverSimple) {
        ServerCallStreamObserver responseObserver = (ServerCallStreamObserver)responseObserverSimple;
        responseObserver.disableAutoInboundFlowControl();
        responseObserver.request(1);
        FlightStream fs = new FlightStream(this.allocator, 5, null, count -> responseObserver.request(count));
        this.executors.submit(() -> {
            try {
                responseObserver.onNext((Object)this.producer.acceptPut(fs).call());
                responseObserver.onCompleted();
            }
            catch (Exception ex) {
                responseObserver.onError((Throwable)ex);
            }
        });
        return fs.asObserver();
    }

    @Override
    public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight.FlightGetInfo> responseObserver) {
        try {
            FlightInfo info = this.producer.getFlightInfo(new FlightDescriptor(request));
            responseObserver.onNext((Object)info.toProtocol());
            responseObserver.onCompleted();
        }
        catch (Exception ex) {
            responseObserver.onError((Throwable)ex);
        }
    }

    private static class GetListener
    implements FlightProducer.ServerStreamListener {
        private ServerCallStreamObserver<ArrowMessage> responseObserver;
        private volatile VectorUnloader unloader;

        public GetListener(StreamObserver<ArrowMessage> responseObserver) {
            this.responseObserver = (ServerCallStreamObserver)responseObserver;
            this.responseObserver.setOnCancelHandler(() -> this.onCancel());
            this.responseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            logger.debug("Stream cancelled by client.");
        }

        @Override
        public boolean isReady() {
            return this.responseObserver.isReady();
        }

        @Override
        public boolean isCancelled() {
            return this.responseObserver.isCancelled();
        }

        @Override
        public void start(VectorSchemaRoot root) {
            this.responseObserver.onNext((Object)new ArrowMessage(null, root.getSchema()));
            this.unloader = new VectorUnloader(root, true, false);
        }

        @Override
        public void putNext() {
            Preconditions.checkNotNull((Object)this.unloader);
            this.responseObserver.onNext((Object)new ArrowMessage(this.unloader.getRecordBatch()));
        }

        @Override
        public void error(Throwable ex) {
            this.responseObserver.onError(ex);
        }

        @Override
        public void completed() {
            this.responseObserver.onCompleted();
        }
    }
}

