/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.util.Objects;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.util.Utils;

@Public
public class ConnectedStreams<IN1, IN2> {
    protected final StreamExecutionEnvironment environment;
    protected final DataStream<IN1> inputStream1;
    protected final DataStream<IN2> inputStream2;
    protected boolean isEnableAsyncState;

    protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
        this.environment = Objects.requireNonNull(env);
        this.inputStream1 = Objects.requireNonNull(input1);
        this.inputStream2 = Objects.requireNonNull(input2);
        this.isEnableAsyncState = this.inputStream1 instanceof KeyedStream && this.inputStream2 instanceof KeyedStream ? ((KeyedStream)this.inputStream1).isEnableAsyncState() && ((KeyedStream)this.inputStream2).isEnableAsyncState() : false;
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.environment;
    }

    public DataStream<IN1> getFirstInput() {
        return this.inputStream1;
    }

    public DataStream<IN2> getSecondInput() {
        return this.inputStream2;
    }

    public TypeInformation<IN1> getType1() {
        return this.inputStream1.getType();
    }

    public TypeInformation<IN2> getType2() {
        return this.inputStream2.getType();
    }

    @Deprecated
    public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
        return new ConnectedStreams<IN1, IN2>(this.environment, ConnectedStreams.keyBy(this.inputStream1, keyPosition1), ConnectedStreams.keyBy(this.inputStream2, keyPosition2));
    }

    @Deprecated
    public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
        return new ConnectedStreams<IN1, IN2>(this.environment, ConnectedStreams.keyBy(this.inputStream1, keyPositions1), ConnectedStreams.keyBy(this.inputStream2, keyPositions2));
    }

    @Deprecated
    public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
        return new ConnectedStreams<IN1, IN2>(this.environment, ConnectedStreams.keyBy(this.inputStream1, field1), ConnectedStreams.keyBy(this.inputStream2, field2));
    }

    @Deprecated
    public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
        return new ConnectedStreams<IN1, IN2>(this.environment, ConnectedStreams.keyBy(this.inputStream1, fields1), ConnectedStreams.keyBy(this.inputStream2, fields2));
    }

    private static <T> DataStream<T> keyBy(DataStream<T> inputStream, int ... keyPositions) {
        if (inputStream.getType() instanceof BasicArrayTypeInfo || inputStream.getType() instanceof PrimitiveArrayTypeInfo) {
            return inputStream.keyBy(KeySelectorUtil.getSelectorForArray(keyPositions, inputStream.getType()));
        }
        return inputStream.keyBy(new Keys.ExpressionKeys<T>(keyPositions, inputStream.getType()));
    }

    private static <T> DataStream<T> keyBy(DataStream<T> inputStream, String ... fields) {
        return inputStream.keyBy(new Keys.ExpressionKeys<T>(fields, inputStream.getType()));
    }

    public <KEY> ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, KEY> keySelector1, KeySelector<IN2, KEY> keySelector2) {
        return new ConnectedStreams<IN1, IN2>(this.environment, this.inputStream1.keyBy(keySelector1), this.inputStream2.keyBy(keySelector2));
    }

    public <KEY> ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, KEY> keySelector1, KeySelector<IN2, KEY> keySelector2, TypeInformation<KEY> keyType) {
        return new ConnectedStreams<IN1, IN2>(this.environment, this.inputStream1.keyBy(keySelector1, keyType), this.inputStream2.keyBy(keySelector2, keyType));
    }

    public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {
        TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, CoMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.getType1(), this.getType2(), Utils.getCallLocationName(), true);
        return this.map(coMapper, outTypeInfo);
    }

    public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper, TypeInformation<R> outputType) {
        return this.transform("Co-Map", outputType, new CoStreamMap<IN1, IN2, R>(this.inputStream1.clean(coMapper)));
    }

    public <R> SingleOutputStreamOperator<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
        TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, CoFlatMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.getType1(), this.getType2(), Utils.getCallLocationName(), true);
        return this.flatMap(coFlatMapper, outTypeInfo);
    }

    public <R> SingleOutputStreamOperator<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper, TypeInformation<R> outputType) {
        return this.transform("Co-Flat Map", outputType, new CoStreamFlatMap<IN1, IN2, R>(this.inputStream1.clean(coFlatMapper)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction) {
        TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, CoProcessFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.getType1(), this.getType2(), Utils.getCallLocationName(), true);
        return this.process(coProcessFunction, outTypeInfo);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction, TypeInformation<R> outputType) {
        AbstractUdfStreamOperator operator = this.inputStream1 instanceof KeyedStream && this.inputStream2 instanceof KeyedStream ? new LegacyKeyedCoProcessOperator(this.inputStream1.clean(coProcessFunction)) : new CoProcessOperator<IN1, IN2, R>(this.inputStream1.clean(coProcessFunction));
        return this.transform("Co-Process", outputType, (TwoInputStreamOperator<IN1, IN2, R>)((Object)operator));
    }

    @PublicEvolving
    public <K, R> SingleOutputStreamOperator<R> process(KeyedCoProcessFunction<K, IN1, IN2, R> keyedCoProcessFunction) {
        TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(keyedCoProcessFunction, KeyedCoProcessFunction.class, 1, 2, 3, TypeExtractor.NO_INDEX, this.getType1(), this.getType2(), Utils.getCallLocationName(), true);
        return this.process(keyedCoProcessFunction, outTypeInfo);
    }

    @Internal
    public <K, R> SingleOutputStreamOperator<R> process(KeyedCoProcessFunction<K, IN1, IN2, R> keyedCoProcessFunction, TypeInformation<R> outputType) {
        if (!(this.inputStream1 instanceof KeyedStream) || !(this.inputStream2 instanceof KeyedStream)) {
            throw new UnsupportedOperationException("KeyedCoProcessFunction can only be used when both input streams are of type KeyedStream.");
        }
        AbstractStreamOperator operator = this.isEnableAsyncState ? new AsyncKeyedCoProcessOperator<K, IN1, IN2, R>(this.inputStream1.clean(keyedCoProcessFunction)) : new KeyedCoProcessOperator<K, IN1, IN2, R>(this.inputStream1.clean(keyedCoProcessFunction));
        return this.transform("Co-Keyed-Process", outputType, (TwoInputStreamOperator<IN1, IN2, R>)((Object)operator));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperator<IN1, IN2, R> operator) {
        return this.doTransform(functionName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperatorFactory<IN1, IN2, R> operatorFactory) {
        return this.doTransform(functionName, outTypeInfo, operatorFactory);
    }

    private <R> SingleOutputStreamOperator<R> doTransform(String functionName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
        this.inputStream1.getType();
        this.inputStream2.getType();
        TwoInputTransformation<Object, Object, R> transform = new TwoInputTransformation<Object, Object, R>(this.inputStream1.getTransformation(), this.inputStream2.getTransformation(), functionName, operatorFactory, outTypeInfo, this.environment.getParallelism(), false);
        TypeInformation keyType = null;
        if (this.inputStream1 instanceof KeyedStream) {
            KeyedStream keyedInput1 = (KeyedStream)this.inputStream1;
            keyType = keyedInput1.getKeyType();
            transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
            transform.setStateKeyType(keyType);
        }
        if (this.inputStream2 instanceof KeyedStream) {
            KeyedStream keyedInput2 = (KeyedStream)this.inputStream2;
            TypeInformation keyType2 = keyedInput2.getKeyType();
            if (!(keyType == null || keyType.canEqual(keyType2) && keyType.equals(keyType2))) {
                throw new UnsupportedOperationException("Key types if input KeyedStreams don't match: " + String.valueOf(keyType) + " and " + String.valueOf(keyType2) + ".");
            }
            transform.setStateKeySelectors(transform.getStateKeySelector1(), keyedInput2.getKeySelector());
            transform.setStateKeyType(keyType2);
        }
        SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(this.environment, transform);
        this.getExecutionEnvironment().addOperator(transform);
        return returnStream;
    }

    @Experimental
    public ConnectedStreams<IN1, IN2> enableAsyncState() {
        if (!(this.inputStream1 instanceof KeyedStream) || !(this.inputStream2 instanceof KeyedStream)) {
            throw new UnsupportedOperationException("The connected streams do not support async state, please ensure that two input streams of your connected streams are keyed stream(not behind a keyBy()).");
        }
        ((KeyedStream)this.inputStream1).enableAsyncState();
        ((KeyedStream)this.inputStream2).enableAsyncState();
        this.isEnableAsyncState = true;
        return this;
    }
}

