/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.container.StorageContainerChannelManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.RpcUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RootRangeClientImpl
implements RootRangeClient {
    private static final Logger log = LoggerFactory.getLogger(RootRangeClientImpl.class);
    private final ScheduledExecutorService executor;
    private final StorageContainerChannel scClient;

    RootRangeClientImpl(OrderedScheduler scheduler, StorageContainerChannelManager channelManager) {
        this.executor = scheduler.chooseThread(0L);
        this.scClient = channelManager.getOrCreate(0L);
    }

    @VisibleForTesting
    StorageContainerChannel getStorageContainerClient() {
        return this.scClient;
    }

    private <T, ReqT, RespT> CompletableFuture<T> processRootRangeRpc(RpcUtils.CreateRequestFunc<ReqT> createRequestFunc, RpcUtils.ProcessRequestFunc<ReqT, RespT, RootRangeServiceGrpc.RootRangeServiceFutureStub> processRequestFunc, RpcUtils.ProcessResponseFunc<RespT, T> processResponseFunc) {
        CompletionStage result = FutureUtils.createFuture().whenComplete((v, cause) -> {
            if (null != cause && RpcUtils.isContainerNotFound(cause)) {
                this.scClient.resetStorageServerChannelFuture();
            }
        });
        this.scClient.getStorageContainerChannelFuture().whenComplete((arg_0, arg_1) -> this.lambda$processRootRangeRpc$1((CompletableFuture)result, createRequestFunc, processRequestFunc, processResponseFunc, arg_0, arg_1));
        return result;
    }

    @Override
    public CompletableFuture<NamespaceProperties> createNamespace(String namespace, NamespaceConfiguration colConf) {
        return this.processRootRangeRpc(() -> ProtoUtils.createCreateNamespaceRequest(namespace, colConf), (rootRangeService, request) -> rootRangeService.createNamespace((CreateNamespaceRequest)request), (resp, resultFuture) -> this.processCreateNamespaceResponse(namespace, (CreateNamespaceResponse)resp, resultFuture));
    }

    private void processCreateNamespaceResponse(String namespace, CreateNamespaceResponse response, CompletableFuture<NamespaceProperties> createNamespaceFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            createNamespaceFuture.complete(response.getNsProps());
            return;
        }
        createNamespaceFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(namespace, code));
    }

    @Override
    public CompletableFuture<Boolean> deleteNamespace(String namespace) {
        return this.processRootRangeRpc(() -> ProtoUtils.createDeleteNamespaceRequest(namespace), (rootRangeService, request) -> rootRangeService.deleteNamespace((DeleteNamespaceRequest)request), (resp, resultFuture) -> this.processDeleteNamespaceResponse(namespace, (DeleteNamespaceResponse)resp, resultFuture));
    }

    private void processDeleteNamespaceResponse(String namespace, DeleteNamespaceResponse response, CompletableFuture<Boolean> deleteFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            deleteFuture.complete(true);
            return;
        }
        deleteFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(namespace, code));
    }

    @Override
    public CompletableFuture<NamespaceProperties> getNamespace(String namespace) {
        return this.processRootRangeRpc(() -> ProtoUtils.createGetNamespaceRequest(namespace), (rootRangeService, request) -> rootRangeService.getNamespace((GetNamespaceRequest)request), (resp, resultFuture) -> this.processGetNamespaceResponse(namespace, (GetNamespaceResponse)resp, resultFuture));
    }

    private void processGetNamespaceResponse(String namespace, GetNamespaceResponse response, CompletableFuture<NamespaceProperties> getFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            getFuture.complete(response.getNsProps());
            return;
        }
        getFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(namespace, code));
    }

    @Override
    public CompletableFuture<StreamProperties> createStream(String colName, String streamName, StreamConfiguration streamConf) {
        return this.processRootRangeRpc(() -> ProtoUtils.createCreateStreamRequest(colName, streamName, streamConf), (rootRangeService, request) -> rootRangeService.createStream((CreateStreamRequest)request), (resp, resultFuture) -> this.processCreateStreamResponse(streamName, (CreateStreamResponse)resp, resultFuture));
    }

    private void processCreateStreamResponse(String streamName, CreateStreamResponse response, CompletableFuture<StreamProperties> createStreamFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            createStreamFuture.complete(response.getStreamProps());
            return;
        }
        createStreamFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(streamName, code));
    }

    @Override
    public CompletableFuture<StreamProperties> getStream(String colName, String streamName) {
        return this.processRootRangeRpc(() -> ProtoUtils.createGetStreamRequest(colName, streamName), (rootRangeService, request) -> rootRangeService.getStream((GetStreamRequest)request), (resp, resultFuture) -> this.processGetStreamResponse(streamName, (GetStreamResponse)resp, resultFuture));
    }

    @Override
    public CompletableFuture<StreamProperties> getStream(long streamId) {
        return this.processRootRangeRpc(() -> ProtoUtils.createGetStreamRequest(streamId), (rootRangeService, request) -> rootRangeService.getStream((GetStreamRequest)request), (resp, resultFuture) -> this.processGetStreamResponse("Stream(" + streamId + ")", (GetStreamResponse)resp, resultFuture));
    }

    private void processGetStreamResponse(String streamName, GetStreamResponse response, CompletableFuture<StreamProperties> getStreamFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            getStreamFuture.complete(response.getStreamProps());
            return;
        }
        getStreamFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(streamName, code));
    }

    @Override
    public CompletableFuture<Boolean> deleteStream(String colName, String streamName) {
        return this.processRootRangeRpc(() -> ProtoUtils.createDeleteStreamRequest(colName, streamName), (rootRangeService, request) -> rootRangeService.deleteStream((DeleteStreamRequest)request), (resp, resultFuture) -> this.processDeleteStreamResponse(streamName, (DeleteStreamResponse)resp, resultFuture));
    }

    private void processDeleteStreamResponse(String streamName, DeleteStreamResponse response, CompletableFuture<Boolean> deleteStreamFuture) {
        StatusCode code = response.getCode();
        if (StatusCode.SUCCESS == code) {
            deleteStreamFuture.complete(true);
            return;
        }
        deleteStreamFuture.completeExceptionally(ProtocolInternalUtils.createRootRangeException(streamName, code));
    }

    private void handleGetRootRangeServiceFailure(CompletableFuture<?> future, Throwable cause) {
        future.completeExceptionally(new ClientException("GetRootRangeService is unexpected to fail", cause));
    }

    private /* synthetic */ void lambda$processRootRangeRpc$1(CompletableFuture result, RpcUtils.CreateRequestFunc createRequestFunc, RpcUtils.ProcessRequestFunc processRequestFunc, RpcUtils.ProcessResponseFunc processResponseFunc, StorageServerChannel rsChannel, Throwable cause) {
        if (null != cause) {
            this.handleGetRootRangeServiceFailure(result, cause);
            return;
        }
        RpcUtils.processRpc(rsChannel.getRootRangeService(), result, createRequestFunc, processRequestFunc, processResponseFunc);
    }
}

