/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>,
Triggerable<KS, VoidNamespace> {
    private static final long serialVersionUID = 5926499536290284870L;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient ReadWriteContextImpl rwContext;
    private transient ReadOnlyContextImpl rContext;
    private transient OnTimerContextImpl onTimerContext;

    public CoBroadcastWithKeyedOperator(KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        super(function);
        this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
    }

    @Override
    public void open() throws Exception {
        super.open();
        InternalTimerService<VoidNamespace> internalTimerService = this.getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        SimpleTimerService timerService = new SimpleTimerService(internalTimerService);
        this.collector = new TimestampedCollector(this.output);
        this.broadcastStates = CollectionUtil.newHashMapWithExpectedSize(this.broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor : this.broadcastStateDescriptors) {
            this.broadcastStates.put(descriptor, this.getOperatorStateBackend().getBroadcastState(descriptor));
        }
        this.rwContext = new ReadWriteContextImpl(this.getExecutionConfig(), this.getKeyedStateBackend(), (KeyedBroadcastProcessFunction)this.userFunction, this.broadcastStates, timerService);
        this.rContext = new ReadOnlyContextImpl(this.getExecutionConfig(), (KeyedBroadcastProcessFunction)this.userFunction, this.broadcastStates, timerService);
        this.onTimerContext = new OnTimerContextImpl(this.getExecutionConfig(), (KeyedBroadcastProcessFunction)this.userFunction, this.broadcastStates, timerService);
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        this.collector.setTimestamp(element);
        this.rContext.setElement(element);
        ((KeyedBroadcastProcessFunction)this.userFunction).processElement(element.getValue(), this.rContext, this.collector);
        this.rContext.setElement(null);
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        this.collector.setTimestamp(element);
        this.rwContext.setElement(element);
        ((KeyedBroadcastProcessFunction)this.userFunction).processBroadcastElement(element.getValue(), this.rwContext, this.collector);
        this.rwContext.setElement(null);
    }

    @Override
    public void onEventTime(InternalTimer<KS, VoidNamespace> timer) throws Exception {
        this.collector.setAbsoluteTimestamp(timer.getTimestamp());
        this.onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
        this.onTimerContext.timer = timer;
        ((KeyedBroadcastProcessFunction)this.userFunction).onTimer(timer.getTimestamp(), this.onTimerContext, this.collector);
        this.onTimerContext.timeDomain = null;
        this.onTimerContext.timer = null;
    }

    @Override
    public void onProcessingTime(InternalTimer<KS, VoidNamespace> timer) throws Exception {
        this.collector.eraseTimestamp();
        this.onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
        this.onTimerContext.timer = timer;
        ((KeyedBroadcastProcessFunction)this.userFunction).onTimer(timer.getTimestamp(), this.onTimerContext, this.collector);
        this.onTimerContext.timeDomain = null;
        this.onTimerContext.timer = null;
    }

    private class OnTimerContextImpl
    extends KeyedBroadcastProcessFunction.OnTimerContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private TimeDomain timeDomain;
        private InternalTimer<KS, VoidNamespace> timer;

        OnTimerContextImpl(ExecutionConfig executionConfig, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, TimerService timerService) {
            super(function);
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState(this.timer != null);
            return this.timer.getTimestamp();
        }

        @Override
        public TimeDomain timeDomain() {
            Preconditions.checkState(this.timeDomain != null);
            return this.timeDomain;
        }

        @Override
        public KS getCurrentKey() {
            return this.timer.getKey();
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return this.timerService.currentWatermark();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null.");
            CoBroadcastWithKeyedOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.timer.getTimestamp()));
        }

        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(this.config);
            ReadOnlyBroadcastState state = this.states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
    }

    private class ReadOnlyContextImpl
    extends KeyedBroadcastProcessFunction.ReadOnlyContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN1> element;

        ReadOnlyContextImpl(ExecutionConfig executionConfig, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, TimerService timerService) {
            super(function);
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        void setElement(StreamRecord<IN1> e) {
            this.element = e;
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState(this.element != null);
            return this.element.hasTimestamp() ? Long.valueOf(this.element.getTimestamp()) : null;
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return this.timerService.currentWatermark();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null.");
            CoBroadcastWithKeyedOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.element.getTimestamp()));
        }

        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(this.config);
            ReadOnlyBroadcastState state = this.states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }

        @Override
        public KS getCurrentKey() {
            return CoBroadcastWithKeyedOperator.this.getCurrentKey();
        }
    }

    private class ReadWriteContextImpl
    extends KeyedBroadcastProcessFunction.Context {
        private final ExecutionConfig config;
        private final KeyedStateBackend<KS> keyedStateBackend;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN2> element;

        ReadWriteContextImpl(ExecutionConfig executionConfig, KeyedStateBackend<KS> keyedStateBackend, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, TimerService timerService) {
            super(function);
            this.config = Preconditions.checkNotNull(executionConfig);
            this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        void setElement(StreamRecord<IN2> e) {
            this.element = e;
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState(this.element != null);
            return this.element.getTimestamp();
        }

        @Override
        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(this.config);
            BroadcastState<?, ?> state = this.states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null.");
            CoBroadcastWithKeyedOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.element.getTimestamp()));
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return this.timerService.currentWatermark();
        }

        @Override
        public <VS, S extends State> void applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) throws Exception {
            this.keyedStateBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, Preconditions.checkNotNull(stateDescriptor), Preconditions.checkNotNull(function));
        }
    }
}

