/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler;
import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.BundleSplitHandler;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SdkHarnessClient
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final ConcurrentHashMap<String, BundleProcessor> clientProcessors;

    private SdkHarnessClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnApiDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = fnApiControlClient;
        this.clientProcessors = new ConcurrentHashMap();
    }

    public InstructionRequestHandler getInstructionRequestHandler() {
        return this.fnApiControlClient;
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService) {
        return new SdkHarnessClient(fnApiControlClient, fnApiDataService, IdGenerators.incrementingLongs());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, List<RemoteInputDestination> remoteInputDesinations) {
        Preconditions.checkState((!descriptor.hasStateApiServiceDescriptor() ? 1 : 0) != 0, (String)"The %s cannot support a %s containing a state %s.", (Object)BundleProcessor.class.getSimpleName(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return this.getProcessor(descriptor, remoteInputDesinations, NoOpStateDelegator.INSTANCE);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, List<RemoteInputDestination> remoteInputDesinations, StateDelegator stateDelegator) {
        Preconditions.checkState((!descriptor.hasTimerApiServiceDescriptor() ? 1 : 0) != 0, (String)"The %s cannot support a %s containing a timer %s.", (Object)BundleProcessor.class.getSimpleName(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return this.getProcessor(descriptor, remoteInputDesinations, stateDelegator, Collections.EMPTY_MAP);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, List<RemoteInputDestination> remoteInputDestinations, StateDelegator stateDelegator, Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecs) {
        BundleProcessor bundleProcessor = this.clientProcessors.computeIfAbsent(descriptor.getId(), s -> this.create(descriptor, remoteInputDestinations, timerSpecs, stateDelegator));
        Preconditions.checkArgument((boolean)bundleProcessor.processBundleDescriptor.equals((Object)descriptor), (String)"The provided %s with id %s collides with an existing %s with the same id but containing different contents.", (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)descriptor.getId(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName());
        return bundleProcessor;
    }

    private BundleProcessor create(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, List<RemoteInputDestination> remoteInputDestinations, Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecs, StateDelegator stateDelegator) {
        LOG.debug("Registering {}", (Object)processBundleDescriptor);
        this.fnApiControlClient.registerProcessBundleDescriptor(processBundleDescriptor);
        BundleProcessor bundleProcessor = new BundleProcessor(processBundleDescriptor, remoteInputDestinations, timerSpecs, stateDelegator);
        return bundleProcessor;
    }

    @Override
    public void close() {
    }

    private static class CountingFnDataReceiver<T>
    implements CloseableFnDataReceiver<T> {
        private final CloseableFnDataReceiver delegate;
        private long count;

        private CountingFnDataReceiver(CloseableFnDataReceiver delegate) {
            this.delegate = delegate;
        }

        public long getCount() {
            return this.count;
        }

        public void accept(T input) throws Exception {
            this.delegate.accept(input);
            ++this.count;
        }

        public void flush() throws Exception {
            this.delegate.flush();
        }

        public void close() throws Exception {
            this.delegate.close();
        }
    }

    private static class NoOpStateDelegator
    implements StateDelegator {
        private static final NoOpStateDelegator INSTANCE = new NoOpStateDelegator();

        private NoOpStateDelegator() {
        }

        @Override
        public Registration registerForProcessBundleInstructionId(String processBundleInstructionId, StateRequestHandler handler) {
            return Registration.INSTANCE;
        }

        private static class Registration
        implements StateDelegator.Registration {
            private static final Registration INSTANCE = new Registration();

            private Registration() {
            }

            @Override
            public void deregister() {
            }

            @Override
            public void abort() {
            }
        }
    }

    public class BundleProcessor {
        private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
        private final List<RemoteInputDestination> remoteInputs;
        private final Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecs;
        private final StateDelegator stateDelegator;

        private BundleProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, List<RemoteInputDestination> remoteInputs, Map<String, Map<String, ProcessBundleDescriptors.TimerSpec>> timerSpecs, StateDelegator stateDelegator) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.remoteInputs = remoteInputs;
            this.timerSpecs = timerSpecs;
            this.stateDelegator = stateDelegator;
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, BundleProgressHandler progressHandler) {
            return this.newBundle(outputReceivers, request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered state handler.", ActiveBundle.class.getSimpleName()));
            }, progressHandler);
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) {
            return this.newBundle(outputReceivers, Collections.emptyMap(), stateRequestHandler, progressHandler, BundleSplitHandler.unsupported(), request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle checkpoint handler.", ActiveBundle.class.getSimpleName()));
            }, bundleId -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle finalization handler.", ActiveBundle.class.getSimpleName()));
            });
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, Map<KV<String, String>, RemoteOutputReceiver<Timer<?>>> timerReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, BundleFinalizationHandler finalizationHandler, BundleCheckpointHandler checkpointHandler) {
            return this.newBundle(outputReceivers, timerReceivers, stateRequestHandler, progressHandler, BundleSplitHandler.unsupported(), checkpointHandler == null ? request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle checkpoint handler.", ActiveBundle.class.getSimpleName()));
            } : checkpointHandler, finalizationHandler == null ? bundleId -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle finalization handler.", ActiveBundle.class.getSimpleName()));
            } : finalizationHandler);
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, Map<KV<String, String>, RemoteOutputReceiver<Timer<?>>> timerReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, BundleSplitHandler splitHandler, BundleCheckpointHandler checkpointHandler, BundleFinalizationHandler finalizationHandler) {
            InboundDataClient outputClient;
            String bundleId = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<BeamFnApi.InstructionResponse> genericResponse = SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(bundleId).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId(this.processBundleDescriptor.getId()).addAllCacheTokens(stateRequestHandler.getCacheTokens())).build());
            LOG.debug("Sent {} with ID {} for {} with ID {}", new Object[]{BeamFnApi.ProcessBundleRequest.class.getSimpleName(), bundleId, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptor.getId()});
            CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse = genericResponse.thenApply(BeamFnApi.InstructionResponse::getProcessBundle);
            HashMap<LogicalEndpoint, InboundDataClient> outputClients = new HashMap<LogicalEndpoint, InboundDataClient>();
            for (Map.Entry<String, RemoteOutputReceiver<?>> entry : outputReceivers.entrySet()) {
                LogicalEndpoint logicalEndpoint = LogicalEndpoint.data((String)bundleId, (String)entry.getKey());
                outputClient = this.attachReceiver(logicalEndpoint, entry.getValue());
                outputClients.put(logicalEndpoint, outputClient);
            }
            for (Map.Entry<String, RemoteOutputReceiver<Object>> entry : timerReceivers.entrySet()) {
                LogicalEndpoint logicalEndpoint = LogicalEndpoint.timer((String)bundleId, (String)((String)((KV)entry.getKey()).getKey()), (String)((String)((KV)entry.getKey()).getValue()));
                outputClient = this.attachReceiver(logicalEndpoint, entry.getValue());
                outputClients.put(logicalEndpoint, outputClient);
            }
            ImmutableMap.Builder receiverBuilder = ImmutableMap.builder();
            for (RemoteInputDestination remoteInputDestination : this.remoteInputs) {
                LogicalEndpoint endpoint2 = LogicalEndpoint.data((String)bundleId, (String)remoteInputDestination.getPTransformId());
                receiverBuilder.put((Object)endpoint2, new CountingFnDataReceiver(SdkHarnessClient.this.fnApiDataService.send(endpoint2, remoteInputDestination.getCoder())));
            }
            for (Map.Entry<String, Map<String, ProcessBundleDescriptors.TimerSpec>> entry : this.timerSpecs.entrySet()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : entry.getValue().values()) {
                    LogicalEndpoint endpoint3 = LogicalEndpoint.timer((String)bundleId, (String)timerSpec.transformId(), (String)timerSpec.timerId());
                    receiverBuilder.put((Object)endpoint3, SdkHarnessClient.this.fnApiDataService.send(endpoint3, timerSpec.coder()));
                }
            }
            return new ActiveBundle(bundleId, specificResponse, (Map)receiverBuilder.build(), outputClients, this.stateDelegator.registerForProcessBundleInstructionId(bundleId, stateRequestHandler), progressHandler, splitHandler, checkpointHandler, finalizationHandler);
        }

        private <OutputT> InboundDataClient attachReceiver(LogicalEndpoint endpoint, RemoteOutputReceiver<OutputT> receiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(endpoint, receiver.getCoder(), receiver.getReceiver());
        }

        public class ActiveBundle
        implements RemoteBundle {
            private final String bundleId;
            private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
            private final Map<LogicalEndpoint, CloseableFnDataReceiver> inputReceivers;
            private final Map<LogicalEndpoint, InboundDataClient> outputClients;
            private final StateDelegator.Registration stateRegistration;
            private final BundleProgressHandler progressHandler;
            private final BundleSplitHandler splitHandler;
            private final BundleCheckpointHandler checkpointHandler;
            private final BundleFinalizationHandler finalizationHandler;
            private final Phaser outstandingRequests;
            private final AtomicBoolean isClosed;
            private boolean bundleIsCompleted;

            private ActiveBundle(String bundleId, CompletionStage<BeamFnApi.ProcessBundleResponse> response, Map<LogicalEndpoint, CloseableFnDataReceiver> inputReceivers, Map<LogicalEndpoint, InboundDataClient> outputClients, StateDelegator.Registration stateRegistration, BundleProgressHandler progressHandler, BundleSplitHandler splitHandler, BundleCheckpointHandler checkpointHandler, BundleFinalizationHandler finalizationHandler) {
                this.bundleId = bundleId;
                this.response = response;
                this.inputReceivers = inputReceivers;
                this.outputClients = outputClients;
                this.stateRegistration = stateRegistration;
                this.progressHandler = progressHandler;
                this.splitHandler = splitHandler;
                this.checkpointHandler = checkpointHandler;
                this.finalizationHandler = finalizationHandler;
                this.outstandingRequests = new Phaser(1);
                this.isClosed = new AtomicBoolean(false);
                this.response.whenComplete((processBundleResponse, throwable) -> {
                    ActiveBundle activeBundle = this;
                    synchronized (activeBundle) {
                        this.bundleIsCompleted = true;
                    }
                });
            }

            @Override
            public String getId() {
                return this.bundleId;
            }

            @Override
            public Map<String, FnDataReceiver> getInputReceivers() {
                ImmutableMap.Builder rval = ImmutableMap.builder();
                for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry : this.inputReceivers.entrySet()) {
                    if (entry.getKey().isTimer()) continue;
                    rval.put((Object)entry.getKey().getTransformId(), (Object)((FnDataReceiver)entry.getValue()));
                }
                return rval.build();
            }

            @Override
            public Map<KV<String, String>, FnDataReceiver<Timer>> getTimerReceivers() {
                ImmutableMap.Builder rval = ImmutableMap.builder();
                for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry : this.inputReceivers.entrySet()) {
                    if (!entry.getKey().isTimer()) continue;
                    rval.put((Object)KV.of((Object)entry.getKey().getTransformId(), (Object)entry.getKey().getTimerFamilyId()), (Object)((FnDataReceiver)entry.getValue()));
                }
                return rval.build();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void requestProgress() {
                ActiveBundle activeBundle = this;
                synchronized (activeBundle) {
                    if (this.bundleIsCompleted) {
                        return;
                    }
                    this.outstandingRequests.register();
                }
                BeamFnApi.InstructionRequest request = BeamFnApi.InstructionRequest.newBuilder().setInstructionId(SdkHarnessClient.this.idGenerator.getId()).setProcessBundleProgress(BeamFnApi.ProcessBundleProgressRequest.newBuilder().setInstructionId(this.bundleId).build()).build();
                CompletionStage<BeamFnApi.InstructionResponse> response = SdkHarnessClient.this.fnApiControlClient.handle(request);
                response.whenComplete((instructionResponse, throwable) -> {
                    if (BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance().equals((Object)instructionResponse.getProcessBundleProgress())) {
                        return;
                    }
                    this.progressHandler.onProgress(instructionResponse.getProcessBundleProgress());
                }).whenComplete((instructionResponse, throwable) -> this.outstandingRequests.arrive());
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void split(double fractionOfRemainder) {
                ActiveBundle activeBundle = this;
                synchronized (activeBundle) {
                    if (this.bundleIsCompleted) {
                        return;
                    }
                    this.outstandingRequests.register();
                }
                HashMap<String, BeamFnApi.ProcessBundleSplitRequest.DesiredSplit> splits = new HashMap<String, BeamFnApi.ProcessBundleSplitRequest.DesiredSplit>();
                for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> ptransformToInput : this.inputReceivers.entrySet()) {
                    if (ptransformToInput.getKey().isTimer()) continue;
                    splits.put(ptransformToInput.getKey().getTransformId(), BeamFnApi.ProcessBundleSplitRequest.DesiredSplit.newBuilder().setFractionOfRemainder(fractionOfRemainder).setEstimatedInputElements(((CountingFnDataReceiver)ptransformToInput.getValue()).getCount()).build());
                }
                BeamFnApi.InstructionRequest request = BeamFnApi.InstructionRequest.newBuilder().setInstructionId(SdkHarnessClient.this.idGenerator.getId()).setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId(this.bundleId).putAllDesiredSplits(splits).build()).build();
                CompletionStage<BeamFnApi.InstructionResponse> response = SdkHarnessClient.this.fnApiControlClient.handle(request);
                response.whenComplete((instructionResponse, throwable) -> {
                    if (BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance().equals((Object)instructionResponse.getProcessBundleSplit())) {
                        return;
                    }
                    this.splitHandler.split(instructionResponse.getProcessBundleSplit());
                }).whenComplete((instructionResponse, throwable) -> this.outstandingRequests.arrive());
            }

            @Override
            public void close() throws Exception {
                Exception exception;
                block22: {
                    if (this.isClosed.getAndSet(true)) {
                        return;
                    }
                    exception = null;
                    for (CloseableFnDataReceiver inputReceiver : this.inputReceivers.values()) {
                        try {
                            inputReceiver.close();
                        }
                        catch (Exception e) {
                            if (exception == null) {
                                exception = e;
                                continue;
                            }
                            exception.addSuppressed(e);
                        }
                    }
                    try {
                        if (exception == null) {
                            BeamFnApi.ProcessBundleResponse completedResponse = (BeamFnApi.ProcessBundleResponse)MoreFutures.get(this.response);
                            this.outstandingRequests.arriveAndAwaitAdvance();
                            this.progressHandler.onCompleted(completedResponse);
                            if (completedResponse.getResidualRootsCount() > 0) {
                                this.checkpointHandler.onCheckpoint(completedResponse);
                            }
                            if (completedResponse.getRequiresFinalization()) {
                                this.finalizationHandler.requestsFinalization(this.bundleId);
                            }
                            break block22;
                        }
                        throw new IllegalStateException("Processing bundle failed, TODO: [https://github.com/apache/beam/issues/18756] abort bundle.");
                    }
                    catch (Exception e) {
                        if (exception == null) {
                            exception = e;
                        }
                        exception.addSuppressed(e);
                    }
                }
                try {
                    if (exception == null) {
                        this.stateRegistration.deregister();
                    } else {
                        this.stateRegistration.abort();
                    }
                }
                catch (Exception e) {
                    if (exception == null) {
                        exception = e;
                    }
                    exception.addSuppressed(e);
                }
                for (InboundDataClient outputClient : this.outputClients.values()) {
                    try {
                        if (exception == null) {
                            outputClient.awaitCompletion();
                            continue;
                        }
                        outputClient.cancel();
                    }
                    catch (Exception e) {
                        if (exception == null) {
                            exception = e;
                            continue;
                        }
                        exception.addSuppressed(e);
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
}

