/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

public class SourceOperatorFactory<OUT>
extends AbstractStreamOperatorFactory<OUT>
implements CoordinatedOperatorFactory<OUT>,
ProcessingTimeServiceAware {
    private static final long serialVersionUID = 1L;
    private final Source<OUT, ?, ?> source;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final boolean emitProgressiveWatermarks;
    private final int numCoordinatorWorkerThread;
    @Nullable
    private String coordinatorListeningID;

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
        this(source, watermarkStrategy, true, 1);
    }

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, boolean emitProgressiveWatermarks) {
        this(source, watermarkStrategy, emitProgressiveWatermarks, 1);
    }

    public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, boolean emitProgressiveWatermarks, int numCoordinatorWorkerThread) {
        this.source = Preconditions.checkNotNull(source);
        this.watermarkStrategy = Preconditions.checkNotNull(watermarkStrategy);
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
        this.numCoordinatorWorkerThread = numCoordinatorWorkerThread;
    }

    public Boundedness getBoundedness() {
        return this.source.getBoundedness();
    }

    public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
        this.coordinatorListeningID = coordinatorListeningID;
    }

    @Override
    public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
        OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
        OperatorEventGateway gateway = parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
        SourceOperator sourceOperator = SourceOperatorFactory.instantiateSourceOperator(parameters, this.source::createReader, gateway, this.source.getSplitSerializer(), this.watermarkStrategy, parameters.getProcessingTimeService(), parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress(), this.emitProgressiveWatermarks, parameters.getContainingTask().getCanEmitBatchOfRecords(), this.getSourceWatermarkDeclarations(), this.source instanceof SupportsSplitReassignmentOnRecovery);
        parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
        SourceOperator castedOperator = sourceOperator;
        return (T)castedOperator;
    }

    @Override
    public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
        return new SourceCoordinatorProvider(operatorName, operatorID, this.source, this.numCoordinatorWorkerThread, this.watermarkStrategy.getAlignmentParameters(), this.coordinatorListeningID);
    }

    @Override
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return SourceOperator.class;
    }

    @Override
    public boolean isStreamSource() {
        return true;
    }

    @Override
    public boolean isOutputTypeConfigurable() {
        return this.source instanceof OutputTypeConfigurable;
    }

    @Override
    public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionConfig) {
        if (this.source instanceof OutputTypeConfigurable) {
            ((OutputTypeConfigurable)((Object)this.source)).setOutputType(type, executionConfig);
        }
    }

    public Set<? extends WatermarkDeclaration> getSourceWatermarkDeclarations() {
        return this.source.declareWatermarks();
    }

    private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(StreamOperatorParameters<T> parameters, FunctionWithException<SourceReaderContext, SourceReader<T, ?>, Exception> readerFactory, OperatorEventGateway eventGateway, SimpleVersionedSerializer<?> splitSerializer, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, Configuration config, String localHostName, boolean emitProgressiveWatermarks, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Collection<? extends WatermarkDeclaration> watermarkDeclarations, boolean supportsSplitReassignmentOnRecovery) {
        FunctionWithException typedReaderFactory = readerFactory;
        SimpleVersionedSerializer<?> typedSplitSerializer = splitSerializer;
        Map<String, Boolean> watermarkIsAlignedMap = WatermarkUtils.convertToInternalWatermarkDeclarations(new HashSet<WatermarkDeclaration>(watermarkDeclarations)).stream().collect(Collectors.toMap(AbstractInternalWatermarkDeclaration::getIdentifier, AbstractInternalWatermarkDeclaration::isAligned));
        return new SourceOperator(parameters, typedReaderFactory, eventGateway, typedSplitSerializer, watermarkStrategy, timeService, config, localHostName, emitProgressiveWatermarks, canEmitBatchOfRecords, watermarkIsAlignedMap, supportsSplitReassignmentOnRecovery);
    }
}

