/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class CoProcessOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1L;
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private long currentWatermark = Long.MIN_VALUE;

    public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
        super(flatMapper);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.context = new ContextImpl((CoProcessFunction)this.userFunction, this.getProcessingTimeService());
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        this.collector.setTimestamp(element);
        this.context.element = element;
        ((CoProcessFunction)this.userFunction).processElement1(element.getValue(), this.context, this.collector);
        this.context.element = null;
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        this.collector.setTimestamp(element);
        this.context.element = element;
        ((CoProcessFunction)this.userFunction).processElement2(element.getValue(), this.context, this.collector);
        this.context.element = null;
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    private class ContextImpl
    extends CoProcessFunction.Context
    implements TimerService {
        private final ProcessingTimeService timerService;
        private StreamRecord<?> element;

        ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, ProcessingTimeService timerService) {
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState(this.element != null);
            if (this.element.hasTimestamp()) {
                return this.element.getTimestamp();
            }
            return null;
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.getCurrentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return CoProcessOperator.this.currentWatermark;
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
        }

        @Override
        public void registerEventTimeTimer(long time) {
            throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            throw new UnsupportedOperationException("Deleting timers is only supported on a keyed streams.");
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            throw new UnsupportedOperationException("Deleting timers is only supported on a keyed streams.");
        }

        @Override
        public TimerService timerService() {
            return this;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            CoProcessOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.element.getTimestamp()));
        }
    }
}

