/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OutputManagerFactory;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SplittableParDoProcessKeyedElementsOp<@UnknownKeyFor InputT, @UnknownKeyFor OutputT, @UnknownKeyFor RestrictionT, @UnknownKeyFor PositionT, @UnknownKeyFor WatermarkEstimatorStateT>
implements Op<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, RawUnionValue, byte[]> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SplittableParDoProcessKeyedElementsOp.class);
    private static final @UnknownKeyFor @NonNull @Initialized String TIMER_STATE_ID = "timer";
    private final @UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> mainOutputTag;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized BoundedWindow> windowingStrategy;
    private final @UnknownKeyFor @NonNull @Initialized OutputManagerFactory<@UnknownKeyFor @NonNull @Initialized RawUnionValue> outputManagerFactory;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processElements;
    private final @UnknownKeyFor @NonNull @Initialized String transformId;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded;
    private transient @UnknownKeyFor @NonNull @Initialized StateInternalsFactory<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> stateInternalsFactory;
    private transient @UnknownKeyFor @NonNull @Initialized SamzaTimerInternalsFactory<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> timerInternalsFactory;
    private transient @UnknownKeyFor @NonNull @Initialized DoFnRunner<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>, OutputT> fnRunner;
    private transient @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions;
    private transient @MonotonicNonNull @UnknownKeyFor @Initialized ScheduledExecutorService ses = null;

    public SplittableParDoProcessKeyedElementsOp(@UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> mainOutputTag, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> processKeyedElements, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized BoundedWindow> windowingStrategy, @UnknownKeyFor @NonNull @Initialized OutputManagerFactory<@UnknownKeyFor @NonNull @Initialized RawUnionValue> outputManagerFactory, @UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized String transformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded) {
        this.mainOutputTag = mainOutputTag;
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        this.transformId = transformId;
        this.isBounded = isBounded;
        this.processElements = new SplittableParDoViaKeyedWorkItems.ProcessElements(processKeyedElements);
    }

    @Override
    public void open(@UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized Context context, @UnknownKeyFor @NonNull @Initialized Scheduler<@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>> timerRegistry, @UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter) {
        this.pipelineOptions = (SamzaPipelineOptions)((SerializablePipelineOptions)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"beamPipelineOptions")), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        SamzaStoreStateInternals.Factory nonKeyedStateInternalsFactory = SamzaStoreStateInternals.createNonKeyedStateInternalsFactory(this.transformId, context.getTaskContext(), this.pipelineOptions);
        final DoFnRunners.OutputManager outputManager = this.outputManagerFactory.create(emitter);
        this.stateInternalsFactory = new SamzaStoreStateInternals.Factory<byte[]>(this.transformId, Collections.singletonMap("beamStore", SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), (Coder<byte[]>)ByteArrayCoder.of(), this.pipelineOptions.getStoreBatchGetSize());
        this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory(ByteArrayCoder.of(), timerRegistry, TIMER_STATE_ID, nonKeyedStateInternalsFactory, this.windowingStrategy, this.isBounded, this.pipelineOptions);
        if (this.ses == null) {
            this.ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build());
        }
        final KeyedInternals<byte[]> keyedInternals = new KeyedInternals<byte[]>(this.stateInternalsFactory, this.timerInternalsFactory);
        SplittableParDoViaKeyedWorkItems.ProcessFn processFn = this.processElements.newProcessFn(this.processElements.getFn());
        DoFnInvokers.tryInvokeSetupFor((DoFn)processFn, (PipelineOptions)this.pipelineOptions);
        processFn.setStateInternalsFactory(this.stateInternalsFactory);
        processFn.setTimerInternalsFactory(this.timerInternalsFactory);
        processFn.setSideInputReader((SideInputReader)NullSideInputReader.empty());
        processFn.setProcessElementInvoker((SplittableProcessElementInvoker)new OutputAndTimeBoundedSplittableProcessElementInvoker(this.processElements.getFn(), (PipelineOptions)this.pipelineOptions, new OutputWindowedValue<OutputT>(){

            public void outputWindowedValue(OutputT output, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Collection<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedWindow> windows, @UnknownKeyFor @NonNull @Initialized PaneInfo pane) {
                this.outputWindowedValue(SplittableParDoProcessKeyedElementsOp.this.mainOutputTag, output, timestamp, windows, pane);
            }

            public <AdditionalOutputT> void outputWindowedValue(@UnknownKeyFor @NonNull @Initialized TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Collection<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedWindow> windows, @UnknownKeyFor @NonNull @Initialized PaneInfo pane) {
                outputManager.output(tag, WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
            }
        }, (SideInputReader)NullSideInputReader.empty(), this.ses, 10000, Duration.standardSeconds((long)10L), () -> {
            throw new UnsupportedOperationException("BundleFinalizer unsupported in Samza");
        }));
        StepContext stepContext = new StepContext(){

            public @UnknownKeyFor @NonNull @Initialized StateInternals stateInternals() {
                return keyedInternals.stateInternals();
            }

            public @UnknownKeyFor @NonNull @Initialized TimerInternals timerInternals() {
                return keyedInternals.timerInternals();
            }
        };
        this.fnRunner = DoFnRunners.simpleRunner((PipelineOptions)this.pipelineOptions, (DoFn)processFn, (SideInputReader)NullSideInputReader.of(Collections.emptyList()), (DoFnRunners.OutputManager)outputManager, this.mainOutputTag, Collections.emptyList(), (StepContext)stepContext, null, Collections.emptyMap(), this.windowingStrategy, (DoFnSchemaInformation)DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    @Override
    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [], @UnknownKeyFor @NonNull @Initialized KV<InputT, RestrictionT>>> inputElement, @UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter) {
        this.fnRunner.startBundle();
        this.fnRunner.processElement(inputElement);
        this.fnRunner.finishBundle();
    }

    @Override
    public void processWatermark(@UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter) {
        this.timerInternalsFactory.setInputWatermark(watermark);
        Collection<KeyedTimerData<byte[]>> readyTimers = this.timerInternalsFactory.removeReadyTimers();
        if (!readyTimers.isEmpty()) {
            this.fnRunner.startBundle();
            for (KeyedTimerData<byte[]> keyedTimerData : readyTimers) {
                this.fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
            }
            this.fnRunner.finishBundle();
        }
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore((ReadableInstant)watermark)) {
            this.timerInternalsFactory.setOutputWatermark(watermark);
            emitter.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override
    public void processTimer(@UnknownKeyFor @NonNull @Initialized KeyedTimerData<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> keyedTimerData, @UnknownKeyFor @NonNull @Initialized OpEmitter<@UnknownKeyFor @NonNull @Initialized RawUnionValue> emitter) {
        this.fnRunner.startBundle();
        this.fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
        this.fnRunner.finishBundle();
        this.timerInternalsFactory.removeProcessingTimer(keyedTimerData);
    }

    private void fireTimer(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] key, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized TimerInternals.TimerData timer) {
        LOG.debug("Firing timer {} for key {}", (Object)timer, (Object)key);
        this.fnRunner.processElement(WindowedValue.valueInGlobalWindow((Object)KeyedWorkItems.timersWorkItem((Object)key, Collections.singletonList(timer))));
    }
}

