/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.grpc;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.grpc.GrpcSqlFunctionExecutionConfig;
import com.facebook.presto.grpc.api.udf.GrpcUtils;
import com.facebook.presto.grpc.udf.GrpcFunctionHandle;
import com.facebook.presto.grpc.udf.GrpcSerializedPage;
import com.facebook.presto.grpc.udf.GrpcUdfInvokeGrpc;
import com.facebook.presto.grpc.udf.GrpcUdfPage;
import com.facebook.presto.grpc.udf.GrpcUdfPageFormat;
import com.facebook.presto.grpc.udf.GrpcUdfRequest;
import com.facebook.presto.grpc.udf.GrpcUdfResult;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation;
import com.facebook.presto.spi.function.RoutineCharacteristics;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class GrpcSqlFunctionExecutor
implements SqlFunctionExecutor {
    private static final int DEFAULT_RETRY_ATTEMPTS = 3;
    private final Map<RoutineCharacteristics.Language, GrpcSqlFunctionExecutionConfig> grpcUdfConfigs;
    private final Map<RoutineCharacteristics.Language, GrpcUdfInvokeGrpc.GrpcUdfInvokeFutureStub> futureStubs = new HashMap<RoutineCharacteristics.Language, GrpcUdfInvokeGrpc.GrpcUdfInvokeFutureStub>();
    private BlockEncodingSerde blockEncodingSerde;

    @Inject
    public GrpcSqlFunctionExecutor(Map<RoutineCharacteristics.Language, GrpcSqlFunctionExecutionConfig> grpcUdfConfigs) {
        this.grpcUdfConfigs = Objects.requireNonNull(grpcUdfConfigs, "grpcUdfConfigs is null");
        grpcUdfConfigs.entrySet().forEach(entry -> {
            String grpcAddress = ((GrpcSqlFunctionExecutionConfig)entry.getValue()).getGrpcAddress();
            ManagedChannel channel = ManagedChannelBuilder.forTarget((String)grpcAddress).build();
            this.futureStubs.put((RoutineCharacteristics.Language)entry.getKey(), GrpcUdfInvokeGrpc.newFutureStub((Channel)channel));
        });
    }

    public FunctionImplementationType getImplementationType() {
        return FunctionImplementationType.GRPC;
    }

    public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde) {
        Preconditions.checkState((this.blockEncodingSerde == null ? 1 : 0) != 0, (Object)"blockEncodingSerde already set");
        Preconditions.checkArgument((blockEncodingSerde != null ? 1 : 0) != 0, (Object)"blockEncodingSerde is null");
        this.blockEncodingSerde = blockEncodingSerde;
    }

    public CompletableFuture<SqlFunctionResult> executeFunction(String source, RemoteScalarFunctionImplementation functionImplementation, Page input, List<Integer> channels, List<Type> argumentTypes, Type returnType) {
        GrpcUdfPage grpcUdfPage = this.buildGrpcUdfPage(input, channels, this.grpcUdfConfigs.get(functionImplementation.getLanguage()).getGrpcUdfPageFormat());
        SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle();
        SqlFunctionId functionId = functionHandle.getFunctionId();
        GrpcFunctionHandle grpcFunctionHandle = GrpcFunctionHandle.newBuilder().setFunctionName(functionId.getFunctionName().toString()).addAllArgumentTypes((Iterable)functionId.getArgumentTypes().stream().map(TypeSignature::toString).collect(ImmutableList.toImmutableList())).setReturnType(returnType.toString()).setVersion(functionHandle.getVersion()).build();
        GrpcUdfRequest grpcUdfRequest = GrpcUdfRequest.newBuilder().setSource(source).setGrpcFunctionHandle(grpcFunctionHandle).setInputs(grpcUdfPage).build();
        return this.invokeUdfWithRetry(this.futureStubs.get(functionImplementation.getLanguage()), grpcUdfRequest).thenApply(grpcResult -> this.toSqlFunctionResult((GrpcUdfResult)grpcResult));
    }

    private CompletableFuture<GrpcUdfResult> invokeUdf(GrpcUdfInvokeGrpc.GrpcUdfInvokeFutureStub futureStub, GrpcUdfRequest grpcUdfRequest) {
        try {
            return MoreFutures.toCompletableFuture(futureStub.invokeUdf(grpcUdfRequest));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private CompletableFuture<GrpcUdfResult> invokeUdfWithRetry(GrpcUdfInvokeGrpc.GrpcUdfInvokeFutureStub futureStub, GrpcUdfRequest grpcUdfRequest) {
        CompletionStage<GrpcUdfResult> resultFuture = this.invokeUdf(futureStub, grpcUdfRequest);
        for (int i = 0; i < 3; ++i) {
            resultFuture = ((CompletableFuture)((CompletableFuture)resultFuture.thenApply(CompletableFuture::completedFuture)).exceptionally(t -> this.invokeUdf(futureStub, grpcUdfRequest))).thenCompose(Function.identity());
        }
        return resultFuture;
    }

    private GrpcUdfPage buildGrpcUdfPage(Page input, List<Integer> channels, GrpcUdfPageFormat grpcUdfPageFormat) {
        Block[] blocks = new Block[channels.size()];
        for (int i = 0; i < channels.size(); ++i) {
            blocks[i] = input.getBlock(channels.get(i).intValue());
        }
        switch (grpcUdfPageFormat) {
            case Presto: {
                Preconditions.checkState((this.blockEncodingSerde != null ? 1 : 0) != 0, (Object)"blockEncodingSerde not set");
                Page inputPage = Page.wrapBlocksWithoutCopy((int)input.getPositionCount(), (Block[])blocks);
                GrpcSerializedPage grpcSerializedPage = GrpcUtils.toGrpcSerializedPage(this.blockEncodingSerde, inputPage);
                return GrpcUtils.toGrpcUdfPage(grpcUdfPageFormat, grpcSerializedPage);
            }
        }
        throw new IllegalArgumentException(String.format("Unknown page format: %s", new Object[]{grpcUdfPageFormat}));
    }

    private SqlFunctionResult toSqlFunctionResult(GrpcUdfResult grpcUdfResult) {
        Preconditions.checkState((this.blockEncodingSerde != null ? 1 : 0) != 0, (Object)"blockEncodingSerde not set");
        GrpcUdfPage grpcUdfPage = grpcUdfResult.getResult();
        switch (grpcUdfPage.getGrpcUdfPageFormat()) {
            case Presto: {
                Page resultPage = GrpcUtils.toPrestoPage(this.blockEncodingSerde, grpcUdfPage.getGrpcSerializedPage());
                return new SqlFunctionResult(resultPage.getBlock(0), grpcUdfResult.getUdfStats().getTotalCpuTimeMs());
            }
        }
        throw new IllegalArgumentException(String.format("Unknown page format: %s", new Object[]{grpcUdfPage.getGrpcUdfPageFormat()}));
    }
}

