/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.operators;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.impl.common.OutputCollector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction;
import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;

public class ProcessOperator<IN, OUT>
extends AbstractAsyncStateUdfStreamOperator<OUT, OneInputStreamProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>,
BoundedOneInput {
    protected transient DefaultRuntimeContext context;
    protected transient DefaultPartitionedContext<OUT> partitionedContext;
    protected transient NonPartitionedContext<OUT> nonPartitionedContext;
    protected transient TimestampCollector<OUT> outputCollector;
    protected transient Map<String, AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationMap;
    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

    public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
        super(userFunction);
    }

    public void open() throws Exception {
        super.open();
        StreamingRuntimeContext operatorContext = this.getRuntimeContext();
        TaskInfo taskInfo = operatorContext.getTaskInfo();
        this.context = new DefaultRuntimeContext(operatorContext.getJobInfo().getJobName(), operatorContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), taskInfo.getIndexOfThisSubtask(), taskInfo.getAttemptNumber(), (MetricGroup)operatorContext.getMetricGroup());
        this.outputCollector = this.getOutputCollector();
        this.watermarkDeclarationMap = this.config.getWatermarkDeclarations(this.getUserCodeClassloader()).stream().collect(Collectors.toMap(AbstractInternalWatermarkDeclaration::getIdentifier, Function.identity()));
        this.partitionedContext = new DefaultPartitionedContext(this.context, this::currentKey, this.getProcessorWithKey(), this.getProcessingTimeManager(), operatorContext, (OperatorStateStore)this.getOperatorStateBackend());
        this.outputCollector = this.getOutputCollector();
        this.nonPartitionedContext = this.getNonPartitionedContext();
        this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
        this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(1, this.output, this.timeServiceManager);
        if (this.userFunction instanceof ExtractEventTimeProcessFunction) {
            ((ExtractEventTimeProcessFunction)this.userFunction).initEventTimeExtension(this.getExecutionConfig(), this.partitionedContext.getNonPartitionedContext().getWatermarkManager(), this.getProcessingTimeService());
        } else if (this.userFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) {
            ((EventTimeWrappedOneInputStreamProcessFunction)this.userFunction).initEventTimeExtension(this.getTimerService(), this.getEventTimeSupplier(), this.eventTimeWatermarkHandler);
        }
        ((OneInputStreamProcessFunction)this.userFunction).open(this.nonPartitionedContext);
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        this.outputCollector.setTimestampFromStreamRecord(element);
        ((OneInputStreamProcessFunction)this.userFunction).processRecord(element.getValue(), this.outputCollector, this.partitionedContext);
    }

    public void processWatermarkInternal(WatermarkEvent watermark) throws Exception {
        WatermarkHandlingResult watermarkHandlingResultByUserFunction = ((OneInputStreamProcessFunction)this.userFunction).onWatermark(watermark.getWatermark(), this.outputCollector, this.nonPartitionedContext);
        if (watermarkHandlingResultByUserFunction == WatermarkHandlingResult.PEEK && this.watermarkDeclarationMap.get(watermark.getWatermark().getIdentifier()).getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) {
            if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
                this.eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
            } else {
                this.output.emitWatermark(watermark);
            }
        }
    }

    protected TimestampCollector<OUT> getOutputCollector() {
        return new OutputCollector(this.output);
    }

    public void endInput() throws Exception {
        ((OneInputStreamProcessFunction)this.userFunction).endInput(this.nonPartitionedContext);
    }

    protected Object currentKey() {
        throw new UnsupportedOperationException("The key is only defined for keyed operator");
    }

    protected BiConsumer<Runnable, Object> getProcessorWithKey() {
        if (this.isAsyncStateProcessingEnabled()) {
            return (r, k) -> this.asyncProcessWithKey(k, r::run);
        }
        return (r, k) -> {
            Object oldKey = this.currentKey();
            this.setCurrentKey(k);
            try {
                r.run();
            }
            finally {
                this.setCurrentKey(oldKey);
            }
        };
    }

    protected ProcessingTimeManager getProcessingTimeManager() {
        return UnsupportedProcessingTimeManager.INSTANCE;
    }

    protected NonPartitionedContext<OUT> getNonPartitionedContext() {
        return new DefaultNonPartitionedContext<OUT>(this.context, this.partitionedContext, this.outputCollector, false, null, this.output, this.watermarkDeclarationMap);
    }

    public void close() throws Exception {
        super.close();
        ((OneInputStreamProcessFunction)this.userFunction).close();
    }

    public boolean isAsyncStateProcessingEnabled() {
        return false;
    }

    protected InternalTimerService<VoidNamespace> getTimerService() {
        return null;
    }

    protected Supplier<Long> getEventTimeSupplier() {
        return () -> this.eventTimeWatermarkHandler.getLastEmitWatermark();
    }
}

