/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.time.Duration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;

public class CollectSinkOperatorFactory<IN>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object> {
    private static final long serialVersionUID = 1L;
    private final CollectSinkOperator<IN> operator = (CollectSinkOperator)this.getOperator();
    private final int socketTimeoutMillis;
    public static final ConfigOption<MemorySize> MAX_BATCH_SIZE = ConfigOptions.key("collect-sink.batch-size.max").memoryType().defaultValue(MemorySize.ofMebiBytes(2L));
    public static final ConfigOption<Duration> SOCKET_TIMEOUT = ConfigOptions.key("collect-sink.socket-timeout").durationType().defaultValue(Duration.ofSeconds(10L));

    public CollectSinkOperatorFactory(TypeSerializer<IN> serializer, String accumulatorName) {
        this(serializer, accumulatorName, MAX_BATCH_SIZE.defaultValue(), SOCKET_TIMEOUT.defaultValue());
    }

    public CollectSinkOperatorFactory(TypeSerializer<IN> serializer, String accumulatorName, MemorySize maxBatchSize, Duration socketTimeout) {
        super(new CollectSinkOperator<IN>(serializer, maxBatchSize.getBytes(), accumulatorName));
        this.socketTimeoutMillis = (int)socketTimeout.toMillis();
    }

    public int getSocketTimeoutMillis() {
        return this.socketTimeoutMillis;
    }

    @Override
    public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
        CollectSinkOperator operator = (CollectSinkOperator)super.createStreamOperator(parameters);
        OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
        OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
        operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorId));
        eventDispatcher.registerEventHandler(operatorId, operator);
        return (T)operator;
    }

    @Override
    public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
        return new CollectSinkOperatorCoordinator.Provider(operatorID, this.socketTimeoutMillis);
    }
}

