/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineLifeCycleListener;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaPipelineOptionsValidator;
import org.apache.beam.runners.samza.SamzaPipelineResult;
import org.apache.beam.runners.samza.SamzaPortablePipelineResult;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.runners.samza.translation.PViewToIdMapper;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.SamzaPipelineTranslator;
import org.apache.beam.runners.samza.translation.SamzaPortablePipelineTranslator;
import org.apache.beam.runners.samza.translation.SamzaTransformOverrides;
import org.apache.beam.runners.samza.translation.StateIdParser;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaRunner
extends PipelineRunner<SamzaPipelineResult> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaRunner.class);
    private static final @UnknownKeyFor @NonNull @Initialized String BEAM_DOT_GRAPH = "beamDotGraph";
    private static final @UnknownKeyFor @NonNull @Initialized String BEAM_JSON_GRAPH = "beamJsonGraph";
    private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineLifeCycleListener listener;

    public static @UnknownKeyFor @NonNull @Initialized SamzaRunner fromOptions(@UnknownKeyFor @NonNull @Initialized PipelineOptions opts) {
        SamzaPipelineOptions samzaOptions = (SamzaPipelineOptions)PipelineOptionsValidator.validate(SamzaPipelineOptions.class, (PipelineOptions)opts);
        return new SamzaRunner(samzaOptions);
    }

    private SamzaRunner(@UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options) {
        this.options = options;
        Iterator<SamzaPipelineLifeCycleListener.Registrar> listenerReg = ServiceLoader.load(SamzaPipelineLifeCycleListener.Registrar.class).iterator();
        this.listener = listenerReg.hasNext() ? ((SamzaPipelineLifeCycleListener.Registrar)Iterators.getOnlyElement(listenerReg)).getLifeCycleListener() : null;
    }

    public @UnknownKeyFor @NonNull @Initialized PortablePipelineResult runPortablePipeline(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized JobInfo jobInfo) {
        String dotGraph = PipelineDotRenderer.toDotString((RunnerApi.Pipeline)pipeline);
        LOG.info("Portable pipeline to run DOT graph:\n{}", (Object)dotGraph);
        ConfigBuilder configBuilder = new ConfigBuilder(this.options);
        SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder, this.options);
        configBuilder.put(BEAM_DOT_GRAPH, dotGraph);
        Config config = configBuilder.build();
        this.options.setConfigOverride((Map<String, String>)config);
        if (this.listener != null) {
            this.listener.onInit(config, this.options);
        }
        SamzaExecutionContext executionContext = new SamzaExecutionContext(this.options);
        Map<String, MetricsReporterFactory> reporterFactories = this.getMetricsReporters();
        StreamApplication app = appDescriptor -> {
            ((StreamApplicationDescriptor)appDescriptor.withApplicationContainerContextFactory((ApplicationContainerContextFactory)executionContext.new SamzaExecutionContext.Factory())).withMetricsReporterFactories(reporterFactories);
            SamzaPortablePipelineTranslator.translate(pipeline, new PortableTranslationContext((StreamApplicationDescriptor)appDescriptor, this.options, jobInfo));
        };
        ApplicationRunner runner = this.runSamzaApp(app, config);
        return new SamzaPortablePipelineResult(app, runner, executionContext, this.listener, config);
    }

    public @UnknownKeyFor @NonNull @Initialized SamzaPipelineResult run(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        if (!ExperimentalOptions.hasExperiment((PipelineOptions)pipeline.getOptions(), (String)"beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary((Pipeline)pipeline);
        }
        MetricsEnvironment.setMetricsSupported((boolean)true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Pre-processed Beam pipeline in dot format:\n{}", (Object)PipelineDotRenderer.toDotString((Pipeline)pipeline));
            LOG.debug("Pre-processed Beam pipeline in json format:\n{}", (Object)PipelineJsonRenderer.toJsonString(pipeline));
        }
        pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
        String dotGraph = PipelineDotRenderer.toDotString((Pipeline)pipeline);
        LOG.info("Beam pipeline DOT graph:\n{}", (Object)dotGraph);
        String jsonGraph = PipelineJsonRenderer.toJsonString(pipeline);
        LOG.info("Beam pipeline JSON graph:\n{}", (Object)jsonGraph);
        Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
        Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
        ConfigBuilder configBuilder = new ConfigBuilder(this.options);
        SamzaPipelineTranslator.createConfig(pipeline, this.options, idMap, nonUniqueStateIds, configBuilder);
        configBuilder.put(BEAM_DOT_GRAPH, dotGraph);
        configBuilder.put(BEAM_JSON_GRAPH, jsonGraph);
        Config config = configBuilder.build();
        this.options.setConfigOverride((Map<String, String>)config);
        if (this.listener != null) {
            this.listener.onInit(config, this.options);
        }
        SamzaExecutionContext executionContext = new SamzaExecutionContext(this.options);
        Map<String, MetricsReporterFactory> reporterFactories = this.getMetricsReporters();
        StreamApplication app = appDescriptor -> {
            appDescriptor.withApplicationContainerContextFactory((ApplicationContainerContextFactory)executionContext.new SamzaExecutionContext.Factory());
            appDescriptor.withMetricsReporterFactories(reporterFactories);
            SamzaPipelineTranslator.translate(pipeline, new TranslationContext((StreamApplicationDescriptor)appDescriptor, idMap, nonUniqueStateIds, this.options));
        };
        SamzaPipelineOptionsValidator.validate(this.options);
        ApplicationRunner runner = this.runSamzaApp(app, config);
        return new SamzaPipelineResult(runner, executionContext, this.listener, config);
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized MetricsReporterFactory> getMetricsReporters() {
        if (this.options.getMetricsReporters() != null) {
            HashMap<String, MetricsReporterFactory> reporters = new HashMap<String, MetricsReporterFactory>();
            for (int i = 0; i < this.options.getMetricsReporters().size(); ++i) {
                String name = "beam-metrics-reporter-" + i;
                MetricsReporter reporter = this.options.getMetricsReporters().get(i);
                reporters.put(name, (MetricsReporterFactory & Serializable)(nm, processorId, config) -> reporter);
                LOG.info(name + ": " + reporter.getClass().getName());
            }
            return reporters;
        }
        return Collections.emptyMap();
    }

    private @UnknownKeyFor @NonNull @Initialized ApplicationRunner runSamzaApp(@UnknownKeyFor @NonNull @Initialized StreamApplication app, @UnknownKeyFor @NonNull @Initialized Config config) {
        ApplicationRunner runner = ApplicationRunners.getApplicationRunner((SamzaApplication)app, (Config)config);
        ExternalContext externalContext = null;
        if (this.listener != null) {
            externalContext = this.listener.onStart();
        }
        runner.run(externalContext);
        if (this.listener != null && this.options.getSamzaExecutionEnvironment() == SamzaExecutionEnvironment.YARN) {
            this.listener.onSubmit();
        }
        return runner;
    }
}

