/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.control;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleCheckpointHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;

@Deprecated
public class SingleEnvironmentInstanceJobBundleFactory
implements JobBundleFactory {
    private final EnvironmentFactory environmentFactory;
    private final GrpcFnServer<GrpcDataService> dataService;
    private final GrpcFnServer<GrpcStateService> stateService;
    private final ConcurrentMap<ExecutableStage, StageBundleFactory> stageBundleFactories = new ConcurrentHashMap<ExecutableStage, StageBundleFactory>();
    private final ConcurrentMap<RunnerApi.Environment, RemoteEnvironment> environments = new ConcurrentHashMap<RunnerApi.Environment, RemoteEnvironment>();
    private final IdGenerator idGenerator;

    public static JobBundleFactory create(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> data, GrpcFnServer<GrpcStateService> state, IdGenerator idGenerator) {
        return new SingleEnvironmentInstanceJobBundleFactory(environmentFactory, data, state, idGenerator);
    }

    private SingleEnvironmentInstanceJobBundleFactory(EnvironmentFactory environmentFactory, GrpcFnServer<GrpcDataService> dataService, GrpcFnServer<GrpcStateService> stateService, IdGenerator idGenerator) {
        this.environmentFactory = environmentFactory;
        this.dataService = dataService;
        this.stateService = stateService;
        this.idGenerator = idGenerator;
    }

    @Override
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        return this.stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory);
    }

    private StageBundleFactory createBundleFactory(ExecutableStage stage) {
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor;
        RemoteEnvironment remoteEnv = this.environments.computeIfAbsent(stage.getEnvironment(), env -> {
            try {
                return this.environmentFactory.createEnvironment((RunnerApi.Environment)env, this.idGenerator.getId());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        SdkHarnessClient sdkHarnessClient = SdkHarnessClient.usingFnApiClient(remoteEnv.getInstructionRequestHandler(), this.dataService.getService()).withIdGenerator(this.idGenerator);
        try {
            descriptor = ProcessBundleDescriptors.fromExecutableStage(this.idGenerator.getId(), stage, this.dataService.getApiServiceDescriptor(), this.stateService.getApiServiceDescriptor());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        SdkHarnessClient.BundleProcessor bundleProcessor = sdkHarnessClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), this.stateService.getService());
        return new BundleProcessorStageBundleFactory(descriptor, bundleProcessor, sdkHarnessClient);
    }

    @Override
    public void close() throws Exception {
        Exception thrown = null;
        for (RemoteEnvironment remoteEnvironment : this.environments.values()) {
            try {
                remoteEnvironment.close();
            }
            catch (Exception e) {
                if (thrown == null) {
                    thrown = e;
                    continue;
                }
                thrown.addSuppressed(e);
            }
        }
        if (thrown != null) {
            throw thrown;
        }
    }

    private static class BundleProcessorStageBundleFactory
    implements StageBundleFactory {
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor;
        private final SdkHarnessClient client;
        private final SdkHarnessClient.BundleProcessor processor;

        private BundleProcessorStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor, SdkHarnessClient.BundleProcessor processor, SdkHarnessClient client) {
            this.descriptor = descriptor;
            this.processor = processor;
            this.client = client;
        }

        @Override
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, TimerReceiverFactory timerReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, BundleFinalizationHandler finalizationHandler, BundleCheckpointHandler checkpointHandler) {
            HashMap outputReceivers = new HashMap();
            for (Map.Entry<String, Coder> remoteOutputCoder : this.descriptor.getRemoteOutputCoders().entrySet()) {
                String bundleOutputPCollection = (String)Iterables.getOnlyElement(this.descriptor.getProcessBundleDescriptor().getTransformsOrThrow(remoteOutputCoder.getKey()).getInputsMap().values());
                FnDataReceiver outputReceiver = outputReceiverFactory.create(bundleOutputPCollection);
                outputReceivers.put(remoteOutputCoder.getKey(), RemoteOutputReceiver.of(remoteOutputCoder.getValue(), outputReceiver));
            }
            HashMap timerReceivers = new HashMap();
            for (Map.Entry<String, Map<String, ProcessBundleDescriptors.TimerSpec>> transformTimerSpecs : this.descriptor.getTimerSpecs().entrySet()) {
                for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerSpecs.getValue().values()) {
                    FnDataReceiver receiver = timerReceiverFactory.create(timerSpec.transformId(), timerSpec.timerId());
                    timerReceivers.put((KV<String, String>)KV.of((Object)timerSpec.transformId(), (Object)timerSpec.timerId()), RemoteOutputReceiver.of(timerSpec.coder(), receiver));
                }
            }
            return this.processor.newBundle(outputReceivers, timerReceivers, stateRequestHandler, progressHandler, finalizationHandler, checkpointHandler);
        }

        @Override
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.descriptor;
        }

        @Override
        public InstructionRequestHandler getInstructionRequestHandler() {
            return this.client.getInstructionRequestHandler();
        }

        @Override
        public void close() {
        }
    }
}

