/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.IndexedCombinedWatermarkStatus;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public abstract class AbstractStreamOperatorV2<OUT>
implements StreamOperator<OUT>,
StreamOperatorStateHandler.CheckpointedStreamOperator {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
    protected final StreamConfig config;
    protected final Output<StreamRecord<OUT>> output;
    private final StreamingRuntimeContext runtimeContext;
    private final ExecutionConfig executionConfig;
    private final ClassLoader userCodeClassLoader;
    private final CloseableRegistry cancelables;
    private final IndexedCombinedWatermarkStatus combinedWatermark;
    protected final InternalOperatorMetricGroup metrics;
    protected final LatencyStats latencyStats;
    protected final ProcessingTimeService processingTimeService;
    protected final RecordAttributes[] lastRecordAttributes;
    private StreamOperatorStateHandler stateHandler;
    private InternalTimeServiceManager<?> timeServiceManager;

    public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
        Environment environment = parameters.getContainingTask().getEnvironment();
        this.config = parameters.getStreamConfig();
        this.output = parameters.getOutput();
        this.metrics = environment.getMetricGroup().getOrAddOperator(this.config.getOperatorID(), this.config.getOperatorName());
        this.latencyStats = this.createLatencyStats(environment.getTaskManagerInfo().getConfiguration(), parameters.getContainingTask().getIndexInSubtaskGroup());
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)parameters.getProcessingTimeService());
        this.lastRecordAttributes = new RecordAttributes[numberOfInputs];
        for (int i = 0; i < numberOfInputs; ++i) {
            this.lastRecordAttributes[i] = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
        }
        this.executionConfig = parameters.getContainingTask().getExecutionConfig();
        this.userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader();
        this.cancelables = parameters.getContainingTask().getCancelables();
        this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount((int)numberOfInputs);
        this.runtimeContext = new StreamingRuntimeContext(environment, environment.getAccumulatorRegistry().getUserMap(), (OperatorMetricGroup)this.metrics, this.getOperatorID(), this.processingTimeService, null, environment.getExternalResourceInfoProvider());
    }

    private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
        try {
            LatencyStats.Granularity granularity;
            int historySize = (Integer)taskManagerConfig.get(MetricOptions.LATENCY_HISTORY_SIZE);
            if (historySize <= 0) {
                LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", (Object)MetricOptions.LATENCY_HISTORY_SIZE, (Object)historySize);
                historySize = (Integer)MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
            }
            String configuredGranularity = (String)taskManagerConfig.get(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            try {
                granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
            }
            catch (IllegalArgumentException iae) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn("Configured value {} option for {} is invalid. Defaulting to {}.", new Object[]{configuredGranularity, MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity});
            }
            MetricGroup taskMetricGroup = this.metrics.getTaskMetricGroup();
            return new LatencyStats(taskMetricGroup.addGroup("latency"), historySize, indexInSubtaskGroup, this.getOperatorID(), granularity);
        }
        catch (Exception e) {
            LOG.warn("An error occurred while instantiating latency metrics.", (Throwable)e);
            return new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().addGroup("latency"), 1, 0, new OperatorID(), LatencyStats.Granularity.SINGLE);
        }
    }

    @Override
    public OperatorMetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override
    public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        TypeSerializer keySerializer = this.config.getStateKeySerializer(this.getUserCodeClassloader());
        StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(this.getOperatorID(), this.getClass().getSimpleName(), this.getProcessingTimeService(), this, keySerializer, this.cancelables, (MetricGroup)this.metrics, this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, this.runtimeContext.getJobConfiguration(), this.runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), this.runtimeContext.getUserCodeClassLoader()), this.isUsingCustomRawKeyedState());
        this.stateHandler = new StreamOperatorStateHandler(context, this.getExecutionConfig(), this.cancelables);
        this.timeServiceManager = context.internalTimerServiceManager();
        this.stateHandler.initializeOperatorState(this);
    }

    @Internal
    protected boolean isUsingCustomRawKeyedState() {
        return false;
    }

    @Override
    public void open() throws Exception {
    }

    @Override
    public void finish() throws Exception {
    }

    @Override
    public void close() throws Exception {
        if (this.stateHandler != null) {
            this.stateHandler.dispose();
        }
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
    }

    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception {
        return this.stateHandler.snapshotState(this, Optional.ofNullable(this.timeServiceManager), this.getOperatorName(), checkpointId, timestamp, checkpointOptions, factory, this.isUsingCustomRawKeyedState());
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.stateHandler.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.stateHandler.notifyCheckpointAborted(checkpointId);
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.userCodeClassLoader;
    }

    protected String getOperatorName() {
        if (this.runtimeContext != null) {
            return this.runtimeContext.getTaskInfo().getTaskNameWithSubtasks();
        }
        return this.getClass().getSimpleName();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.stateHandler.getKeyedStateBackend();
    }

    @VisibleForTesting
    public OperatorStateBackend getOperatorStateBackend() {
        return this.stateHandler.getOperatorStateBackend();
    }

    @VisibleForTesting
    public ProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        return this.stateHandler.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.stateHandler.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
    }

    protected <T> void internalSetKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
        if (selector != null) {
            Object key = selector.getKey(record.getValue());
            this.setCurrentKey(key);
        }
    }

    @Override
    public void setCurrentKey(Object key) {
        this.stateHandler.setCurrentKey(key);
    }

    @Override
    public Object getCurrentKey() {
        return this.stateHandler.getCurrentKey();
    }

    public Optional<KeyedStateStore> getKeyedStateStore() {
        if (this.stateHandler == null) {
            return Optional.empty();
        }
        return this.stateHandler.getKeyedStateStore();
    }

    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        this.latencyStats.reportLatency(marker);
        this.output.emitLatencyMarker(marker);
    }

    @VisibleForTesting
    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        InternalTimeServiceManager<?> keyedTimeServiceHandler = this.timeServiceManager;
        KeyedStateBackend<K> keyedStateBackend = this.getKeyedStateBackend();
        Preconditions.checkState((keyedStateBackend != null ? 1 : 0) != 0, (Object)"Timers can only be used on keyed operators.");
        return keyedTimeServiceHandler.getInternalTimerService(name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(mark);
        }
        this.output.emitWatermark(mark);
    }

    protected void reportWatermark(Watermark mark, int inputId) throws Exception {
        if (this.combinedWatermark.updateWatermark(inputId - 1, mark.getTimestamp())) {
            this.processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
    }

    public final void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) throws Exception {
        boolean wasIdle = this.combinedWatermark.isIdle();
        if (this.combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
            this.processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
        if (wasIdle != this.combinedWatermark.isIdle()) {
            this.output.emitWatermarkStatus(watermarkStatus);
        }
    }

    public void processRecordAttributes(RecordAttributes recordAttributes, int inputId) throws Exception {
        this.lastRecordAttributes[inputId - 1] = recordAttributes;
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Arrays.asList(this.lastRecordAttributes)).build());
    }

    @Override
    public OperatorID getOperatorID() {
        return this.config.getOperatorID();
    }

    @Override
    public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
        throw new IllegalStateException("This method should never be called. Use Input class instead");
    }

    @Override
    public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
        throw new IllegalStateException("This method should never be called. Use Input class instead");
    }

    protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
        return Optional.ofNullable(this.timeServiceManager);
    }
}

