/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.co;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
public class AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT>
extends AbstractAsyncStateUdfStreamOperator<OUT, KeyedCoProcessFunction<K, IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>,
Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID = 1L;
    private transient DeclaredVariable<Long> sharedTimestamp;
    private transient TimestampedCollectorWithDeclaredVariable<OUT> collector;
    private transient ContextImpl<K, IN1, IN2, OUT> context;
    private transient OnTimerContextImpl<K, IN1, IN2, OUT> onTimerContext;
    private transient ThrowingConsumer<IN1, Exception> processor1;
    private transient ThrowingConsumer<IN2, Exception> processor2;
    private transient ThrowingConsumer<Long, Exception> timerProcessor;

    public AsyncKeyedCoProcessOperator(KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction) {
        super(keyedCoProcessFunction);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.sharedTimestamp = this.declarationContext.declareVariable(LongSerializer.INSTANCE, "_AsyncCoKeyedProcessOperator$sharedTimestamp", null);
        this.collector = new TimestampedCollectorWithDeclaredVariable(this.output, this.sharedTimestamp);
        InternalTimerService<VoidNamespace> internalTimerService = this.getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        SimpleTimerService timerService = new SimpleTimerService(internalTimerService);
        this.context = new ContextImpl((KeyedCoProcessFunction)this.userFunction, timerService, this.sharedTimestamp);
        this.onTimerContext = new OnTimerContextImpl((KeyedCoProcessFunction)this.userFunction, timerService, this.declarationContext);
        if (this.userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) {
            DeclaringAsyncKeyedCoProcessFunction declaringFunction = (DeclaringAsyncKeyedCoProcessFunction)this.userFunction;
            declaringFunction.declareVariables(this.declarationContext);
            this.processor1 = declaringFunction.declareProcess1(this.declarationContext, this.context, this.collector);
            this.processor2 = declaringFunction.declareProcess2(this.declarationContext, this.context, this.collector);
            this.timerProcessor = declaringFunction.declareOnTimer(this.declarationContext, this.onTimerContext, this.collector);
        } else {
            this.processor1 = in -> ((KeyedCoProcessFunction)this.userFunction).processElement1(in, this.context, this.collector);
            this.processor2 = in -> ((KeyedCoProcessFunction)this.userFunction).processElement2(in, this.context, this.collector);
            this.timerProcessor = in -> ((KeyedCoProcessFunction)this.userFunction).onTimer((long)in, this.onTimerContext, this.collector);
        }
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        this.collector.setTimestamp(element);
        this.processor1.accept(element.getValue());
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        this.collector.setTimestamp(element);
        this.processor2.accept(element.getValue());
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        this.collector.setAbsoluteTimestamp(timer.getTimestamp());
        this.invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        this.collector.eraseTimestamp();
        this.invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer) throws Exception {
        this.onTimerContext.setTime(timer.getTimestamp(), timeDomain);
        this.timerProcessor.accept(timer.getTimestamp());
    }

    private class OnTimerContextImpl<K, IN1, IN2, OUT>
    extends KeyedCoProcessFunction.OnTimerContext {
        private final TimerService timerService;
        private final DeclaredVariable<String> timeDomain;
        private final DeclaredVariable<Long> timestamp;

        OnTimerContextImpl(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, TimerService timerService, DeclarationContext declarationContext) {
            this.timerService = Preconditions.checkNotNull(timerService);
            this.timeDomain = declarationContext.declareVariable(StringSerializer.INSTANCE, "_OnTimerContextImpl$timeDomain", null);
            this.timestamp = declarationContext.declareVariable(LongSerializer.INSTANCE, "_OnTimerContextImpl$timestamp", null);
        }

        public void setTime(long time, TimeDomain one) {
            this.timestamp.set(time);
            this.timeDomain.set(one.name());
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState(this.timestamp.get() != null);
            return (Long)this.timestamp.get();
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            AsyncKeyedCoProcessOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.timestamp()));
        }

        @Override
        public TimeDomain timeDomain() {
            Preconditions.checkState(this.timeDomain.get() != null);
            return TimeDomain.valueOf((String)this.timeDomain.get());
        }

        @Override
        public K getCurrentKey() {
            return (K)AsyncKeyedCoProcessOperator.this.getCurrentKey();
        }
    }

    public class ContextImpl<K, IN1, IN2, OUT>
    extends KeyedCoProcessFunction.Context {
        private final TimerService timerService;
        private final DeclaredVariable<Long> timestamp;

        ContextImpl(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, TimerService timerService, DeclaredVariable<Long> timestamp) {
            this.timerService = Preconditions.checkNotNull(timerService);
            this.timestamp = timestamp;
        }

        @Override
        public Long timestamp() {
            return (Long)this.timestamp.get();
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            AsyncKeyedCoProcessOperator.this.output.collect(outputTag, new StreamRecord<X>(value, (Long)this.timestamp.get()));
        }

        @Override
        public K getCurrentKey() {
            return (K)AsyncKeyedCoProcessOperator.this.getCurrentKey();
        }
    }
}

