package org.apache.flink.streaming.api.operators.co;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.class */
public class IntervalJoinOperator<K, T1, T2, OUT> extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>> implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
    private static final long serialVersionUID = -5380774605111543454L;
    private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
    private static final String LEFT_BUFFER = "LEFT_BUFFER";
    private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
    private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
    private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
    private final long lowerBound;
    private final long upperBound;
    private final OutputTag<T1> leftLateDataOutputTag;
    private final OutputTag<T2> rightLateDataOutputTag;
    private final TypeSerializer<T1> leftTypeSerializer;
    private final TypeSerializer<T2> rightTypeSerializer;
    private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
    private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
    private transient TimestampedCollector<OUT> collector;
    private transient IntervalJoinOperator<K, T1, T2, OUT>.ContextImpl context;
    private transient InternalTimerService<String> internalTimerService;

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperator$BufferEntry.class */
    public static class BufferEntry<T> {
        private final T element;
        private final boolean hasBeenJoined;

        public BufferEntry(T t, boolean z) {
            this.element = t;
            this.hasBeenJoined = z;
        }

        @VisibleForTesting
        public T getElement() {
            return this.element;
        }

        @VisibleForTesting
        public boolean hasBeenJoined() {
            return this.hasBeenJoined;
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperator$BufferEntrySerializer.class */
    public static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
        private static final long serialVersionUID = -20197698803836236L;
        private final TypeSerializer<T> elementSerializer;

        public BufferEntrySerializer(TypeSerializer<T> typeSerializer) {
            this.elementSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        }

        public boolean isImmutableType() {
            return true;
        }

        public TypeSerializer<BufferEntry<T>> duplicate() {
            return new BufferEntrySerializer(this.elementSerializer.duplicate());
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public BufferEntry<T> m945createInstance() {
            return null;
        }

        public BufferEntry<T> copy(BufferEntry<T> bufferEntry) {
            return new BufferEntry<>(((BufferEntry) bufferEntry).element, ((BufferEntry) bufferEntry).hasBeenJoined);
        }

        public BufferEntry<T> copy(BufferEntry<T> bufferEntry, BufferEntry<T> bufferEntry2) {
            return copy((BufferEntry) bufferEntry);
        }

        public int getLength() {
            return -1;
        }

        public void serialize(BufferEntry<T> bufferEntry, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeBoolean(((BufferEntry) bufferEntry).hasBeenJoined);
            this.elementSerializer.serialize(((BufferEntry) bufferEntry).element, dataOutputView);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public BufferEntry<T> m944deserialize(DataInputView dataInputView) throws IOException {
            return new BufferEntry<>(this.elementSerializer.deserialize(dataInputView), dataInputView.readBoolean());
        }

        public BufferEntry<T> deserialize(BufferEntry<T> bufferEntry, DataInputView dataInputView) throws IOException {
            return m944deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeBoolean(dataInputView.readBoolean());
            this.elementSerializer.copy(dataInputView, dataOutputView);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.elementSerializer, ((BufferEntrySerializer) obj).elementSerializer);
        }

        public int hashCode() {
            return Objects.hash(this.elementSerializer);
        }

        public TypeSerializerSnapshot<BufferEntry<T>> snapshotConfiguration() {
            return new BufferEntrySerializerSnapshot(this);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperator$BufferEntrySerializerSnapshot.class */
    public static final class BufferEntrySerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<BufferEntry<T>, BufferEntrySerializer<T>> {
        private static final int VERSION = 2;

        public BufferEntrySerializerSnapshot() {
        }

        BufferEntrySerializerSnapshot(BufferEntrySerializer<T> bufferEntrySerializer) {
            super(bufferEntrySerializer);
        }

        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        public TypeSerializer<?>[] getNestedSerializers(BufferEntrySerializer<T> bufferEntrySerializer) {
            return new TypeSerializer[]{((BufferEntrySerializer) bufferEntrySerializer).elementSerializer};
        }

        protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] typeSerializerArr) {
            return new BufferEntrySerializer<>(typeSerializerArr[0]);
        }

        /* renamed from: createOuterSerializerWithNestedSerializers, reason: collision with other method in class */
        protected /* bridge */ /* synthetic */ TypeSerializer m946createOuterSerializerWithNestedSerializers(TypeSerializer[] typeSerializerArr) {
            return createOuterSerializerWithNestedSerializers((TypeSerializer<?>[]) typeSerializerArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/IntervalJoinOperator$ContextImpl.class */
    public final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
        private long resultTimestamp;
        private long leftTimestamp;
        private long rightTimestamp;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        private ContextImpl(ProcessJoinFunction<T1, T2, OUT> processJoinFunction) {
            super();
            Objects.requireNonNull(processJoinFunction);
            this.resultTimestamp = Long.MIN_VALUE;
            this.leftTimestamp = Long.MIN_VALUE;
            this.rightTimestamp = Long.MIN_VALUE;
        }

        private void updateTimestamps(long j, long j2, long j3) {
            this.leftTimestamp = j;
            this.rightTimestamp = j2;
            this.resultTimestamp = j3;
        }

        @Override // org.apache.flink.streaming.api.functions.co.ProcessJoinFunction.Context
        public long getLeftTimestamp() {
            return this.leftTimestamp;
        }

        @Override // org.apache.flink.streaming.api.functions.co.ProcessJoinFunction.Context
        public long getRightTimestamp() {
            return this.rightTimestamp;
        }

        @Override // org.apache.flink.streaming.api.functions.co.ProcessJoinFunction.Context
        public long getTimestamp() {
            return this.resultTimestamp;
        }

        @Override // org.apache.flink.streaming.api.functions.co.ProcessJoinFunction.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null");
            IntervalJoinOperator.this.output.collect(outputTag, new StreamRecord<>(x, getTimestamp()));
        }
    }

    public IntervalJoinOperator(long j, long j2, boolean z, boolean z2, OutputTag<T1> outputTag, OutputTag<T2> outputTag2, TypeSerializer<T1> typeSerializer, TypeSerializer<T2> typeSerializer2, ProcessJoinFunction<T1, T2, OUT> processJoinFunction) {
        super((ProcessJoinFunction) Preconditions.checkNotNull(processJoinFunction));
        Preconditions.checkArgument(j <= j2, "lowerBound <= upperBound must be fulfilled");
        this.lowerBound = z ? j : j + 1;
        this.upperBound = z2 ? j2 : j2 - 1;
        this.leftLateDataOutputTag = outputTag;
        this.rightLateDataOutputTag = outputTag2;
        this.leftTypeSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.rightTypeSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.context = new ContextImpl(this.userFunction);
        this.internalTimerService = getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.leftBuffer = stateInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor(LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer(new BufferEntrySerializer(this.leftTypeSerializer))));
        this.rightBuffer = stateInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor(RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer(new BufferEntrySerializer(this.rightTypeSerializer))));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
    public void processElement1(StreamRecord<T1> streamRecord) throws Exception {
        processElement(streamRecord, this.leftBuffer, this.rightBuffer, this.lowerBound, this.upperBound, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
    public void processElement2(StreamRecord<T2> streamRecord) throws Exception {
        processElement(streamRecord, this.rightBuffer, this.leftBuffer, -this.upperBound, -this.lowerBound, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <THIS, OTHER> void processElement(StreamRecord<THIS> streamRecord, MapState<Long, List<BufferEntry<THIS>>> mapState, MapState<Long, List<BufferEntry<OTHER>>> mapState2, long j, long j2, boolean z) throws Exception {
        THIS value = streamRecord.getValue();
        long timestamp = streamRecord.getTimestamp();
        if (timestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.");
        }
        if (isLate(timestamp)) {
            sideOutput(value, timestamp, z);
            return;
        }
        addToBuffer(mapState, value, timestamp);
        for (Map.Entry entry : mapState2.entries()) {
            long longValue = ((Long) entry.getKey()).longValue();
            if (longValue >= timestamp + j && longValue <= timestamp + j2) {
                for (BufferEntry bufferEntry : (List) entry.getValue()) {
                    if (z) {
                        collect(value, bufferEntry.element, timestamp, longValue);
                    } else {
                        collect(bufferEntry.element, value, longValue, timestamp);
                    }
                }
            }
        }
        long j3 = j2 > 0 ? timestamp + j2 : timestamp;
        if (z) {
            this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, j3);
        } else {
            this.internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, j3);
        }
    }

    private boolean isLate(long j) {
        return j < this.internalTimerService.currentWatermark();
    }

    protected <T> void sideOutput(T t, long j, boolean z) {
        if (z) {
            if (this.leftLateDataOutputTag != null) {
                this.output.collect(this.leftLateDataOutputTag, new StreamRecord<>(t, j));
            }
        } else if (this.rightLateDataOutputTag != null) {
            this.output.collect(this.rightLateDataOutputTag, new StreamRecord<>(t, j));
        }
    }

    private void collect(T1 t1, T2 t2, long j, long j2) throws Exception {
        long max = Math.max(j, j2);
        this.collector.setAbsoluteTimestamp(max);
        this.context.updateTimestamps(j, j2, max);
        this.userFunction.processElement(t1, t2, this.context, this.collector);
    }

    private static <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> mapState, T t, long j) throws Exception {
        List list = (List) mapState.get(Long.valueOf(j));
        if (list == null) {
            list = new ArrayList();
        }
        list.add(new BufferEntry(t, false));
        mapState.put(Long.valueOf(j), list);
    }

    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, String> internalTimer) throws Exception {
        long timestamp = internalTimer.getTimestamp();
        String namespace = internalTimer.getNamespace();
        logger.trace("onEventTime @ {}", Long.valueOf(timestamp));
        boolean z = -1;
        switch (namespace.hashCode()) {
            case -1230758783:
                if (namespace.equals(CLEANUP_NAMESPACE_RIGHT)) {
                    z = true;
                    break;
                }
                break;
            case 2038325474:
                if (namespace.equals(CLEANUP_NAMESPACE_LEFT)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                long j = this.upperBound <= 0 ? timestamp : timestamp - this.upperBound;
                logger.trace("Removing from left buffer @ {}", Long.valueOf(j));
                this.leftBuffer.remove(Long.valueOf(j));
                return;
            case true:
                long j2 = this.lowerBound <= 0 ? timestamp + this.lowerBound : timestamp;
                logger.trace("Removing from right buffer @ {}", Long.valueOf(j2));
                this.rightBuffer.remove(Long.valueOf(j2));
                return;
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, String> internalTimer) throws Exception {
    }

    @VisibleForTesting
    MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
        return this.leftBuffer;
    }

    @VisibleForTesting
    MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
        return this.rightBuffer;
    }
}
