package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts.class */
public class StreamSourceContexts {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$AutomaticWatermarkContext.class */
    private static class AutomaticWatermarkContext<T> extends WatermarkContext<T> {
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private final long watermarkInterval;
        private volatile ScheduledFuture<?> nextWatermarkTimer;
        private volatile long nextWatermarkTime;
        private long lastRecordTime;
        private boolean idle;

        /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$AutomaticWatermarkContext$WatermarkEmittingTask.class */
        private class WatermarkEmittingTask implements ProcessingTimeService.ProcessingTimeCallback {
            private final org.apache.flink.streaming.runtime.tasks.ProcessingTimeService timeService;
            private final Object lock;
            private final Output<StreamRecord<T>> output;

            private WatermarkEmittingTask(org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService, Object obj, Output<StreamRecord<T>> output) {
                this.timeService = processingTimeService;
                this.lock = obj;
                this.output = output;
            }

            public void onProcessingTime(long j) {
                long currentProcessingTime = this.timeService.getCurrentProcessingTime();
                synchronized (this.lock) {
                    if (!AutomaticWatermarkContext.this.idle) {
                        if (AutomaticWatermarkContext.this.idleTimeout != -1 && currentProcessingTime - AutomaticWatermarkContext.this.lastRecordTime > AutomaticWatermarkContext.this.idleTimeout) {
                            AutomaticWatermarkContext.this.markAsTemporarilyIdle();
                            AutomaticWatermarkContext.this.cancelNextIdleDetectionTask();
                        } else if (currentProcessingTime > AutomaticWatermarkContext.this.nextWatermarkTime) {
                            long j2 = currentProcessingTime - (currentProcessingTime % AutomaticWatermarkContext.this.watermarkInterval);
                            this.output.emitWatermark(new Watermark(j2));
                            AutomaticWatermarkContext.this.nextWatermarkTime = j2 + AutomaticWatermarkContext.this.watermarkInterval;
                        }
                    }
                }
                long j3 = currentProcessingTime + AutomaticWatermarkContext.this.watermarkInterval;
                AutomaticWatermarkContext.this.nextWatermarkTimer = this.timeService.registerTimer(j3, new WatermarkEmittingTask(this.timeService, this.lock, this.output));
            }
        }

        private AutomaticWatermarkContext(Output<StreamRecord<T>> output, long j, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService, Object obj, long j2) {
            super(processingTimeService, obj, j2);
            this.idle = false;
            this.output = (Output) Preconditions.checkNotNull(output, "The output cannot be null.");
            Preconditions.checkArgument(j >= 1, "The watermark interval cannot be smaller than 1 ms.");
            this.watermarkInterval = j;
            this.reuse = new StreamRecord<>(null);
            this.lastRecordTime = Long.MIN_VALUE;
            this.nextWatermarkTimer = this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + j, new WatermarkEmittingTask(this.timeService, obj, output));
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndCollect(T t) {
            this.lastRecordTime = this.timeService.getCurrentProcessingTime();
            this.output.collect(this.reuse.replace(t, this.lastRecordTime));
            if (this.lastRecordTime > this.nextWatermarkTime) {
                long j = this.lastRecordTime - (this.lastRecordTime % this.watermarkInterval);
                this.nextWatermarkTime = j + this.watermarkInterval;
                this.output.emitWatermark(new Watermark(j));
            }
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndCollectWithTimestamp(T t, long j) {
            processAndCollect(t);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected boolean allowWatermark(Watermark watermark) {
            return watermark.getTimestamp() == Long.MAX_VALUE && this.nextWatermarkTime != Long.MAX_VALUE;
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndEmitWatermark(Watermark watermark) {
            this.nextWatermarkTime = Long.MAX_VALUE;
            this.output.emitWatermark(watermark);
            ScheduledFuture<?> scheduledFuture = this.nextWatermarkTimer;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
            if (this.idle != watermarkStatus.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
            this.idle = watermarkStatus.isIdle();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext, org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void close() {
            super.close();
            ScheduledFuture<?> scheduledFuture = this.nextWatermarkTimer;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$ClosedContext.class */
    private static class ClosedContext<T> implements SourceFunction.SourceContext<T> {
        private final Object checkpointLock;

        private ClosedContext(Object obj) {
            this.checkpointLock = obj;
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collect(T t) {
            throwException();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collectWithTimestamp(T t, long j) {
            throwException();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void emitWatermark(Watermark watermark) {
            throwException();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void markAsTemporarilyIdle() {
            throwException();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void close() {
        }

        private void throwException() {
            throw new FlinkRuntimeException("The Source Context has been closed already.");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$ManualWatermarkContext.class */
    private static class ManualWatermarkContext<T> extends WatermarkContext<T> {
        private final boolean emitProgressiveWatermarks;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse;
        private boolean idle;

        private ManualWatermarkContext(Output<StreamRecord<T>> output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService, Object obj, long j, boolean z) {
            super(processingTimeService, obj, j);
            this.idle = false;
            this.emitProgressiveWatermarks = z;
            this.output = (Output) Preconditions.checkNotNull(output, "The output cannot be null.");
            this.reuse = new StreamRecord<>(null);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndCollect(T t) {
            this.output.collect(this.reuse.replace(t));
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndCollectWithTimestamp(T t, long j) {
            this.output.collect(this.reuse.replace(t, j));
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndEmitWatermark(Watermark watermark) {
            this.output.emitWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
            if (this.idle != watermarkStatus.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
            this.idle = watermarkStatus.isIdle();
        }

        @Override // org.apache.flink.streaming.api.operators.StreamSourceContexts.WatermarkContext
        protected boolean allowWatermark(Watermark watermark) {
            return this.emitProgressiveWatermarks || watermark.getTimestamp() == Long.MAX_VALUE;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$NonTimestampContext.class */
    private static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
        private final Object lock;
        private final Output<StreamRecord<T>> output;
        private final StreamRecord<T> reuse = new StreamRecord<>(null);

        private NonTimestampContext(Object obj, Output<StreamRecord<T>> output) {
            this.lock = Preconditions.checkNotNull(obj, "The checkpoint lock cannot be null.");
            this.output = (Output) Preconditions.checkNotNull(output, "The output cannot be null.");
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collect(T t) {
            synchronized (this.lock) {
                this.output.collect(this.reuse.replace(t));
            }
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collectWithTimestamp(T t, long j) {
            collect(t);
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void emitWatermark(Watermark watermark) {
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void markAsTemporarilyIdle() {
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public Object getCheckpointLock() {
            return this.lock;
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$SwitchingOnClose.class */
    private static class SwitchingOnClose<T> implements SourceFunction.SourceContext<T> {
        private SourceFunction.SourceContext<T> nestedContext;

        private SwitchingOnClose(SourceFunction.SourceContext<T> sourceContext) {
            this.nestedContext = sourceContext;
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collect(T t) {
            this.nestedContext.collect(t);
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void collectWithTimestamp(T t, long j) {
            this.nestedContext.collectWithTimestamp(t, j);
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void emitWatermark(Watermark watermark) {
            this.nestedContext.emitWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void markAsTemporarilyIdle() {
            this.nestedContext.markAsTemporarilyIdle();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public Object getCheckpointLock() {
            return this.nestedContext.getCheckpointLock();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void close() {
            this.nestedContext.close();
            this.nestedContext = new ClosedContext(this.nestedContext.getCheckpointLock());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$WatermarkContext.class */
    public static abstract class WatermarkContext<T> implements SourceFunction.SourceContext<T> {
        protected final org.apache.flink.streaming.runtime.tasks.ProcessingTimeService timeService;
        protected final Object checkpointLock;
        protected final long idleTimeout;
        private ScheduledFuture<?> nextCheck;
        private volatile boolean failOnNextCheck;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamSourceContexts$WatermarkContext$IdlenessDetectionTask.class */
        public class IdlenessDetectionTask implements ProcessingTimeService.ProcessingTimeCallback {
            private IdlenessDetectionTask() {
            }

            public void onProcessingTime(long j) throws Exception {
                synchronized (WatermarkContext.this.checkpointLock) {
                    WatermarkContext.this.nextCheck = null;
                    if (WatermarkContext.this.failOnNextCheck) {
                        WatermarkContext.this.markAsTemporarilyIdle();
                    } else {
                        WatermarkContext.this.scheduleNextIdleDetectionTask();
                    }
                }
            }
        }

        public WatermarkContext(org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService, Object obj, long j) {
            this.timeService = (org.apache.flink.streaming.runtime.tasks.ProcessingTimeService) Preconditions.checkNotNull(processingTimeService, "Time Service cannot be null.");
            this.checkpointLock = Preconditions.checkNotNull(obj, "Checkpoint Lock cannot be null.");
            if (j != -1) {
                Preconditions.checkArgument(j >= 1, "The idle timeout cannot be smaller than 1 ms.");
            }
            this.idleTimeout = j;
            scheduleNextIdleDetectionTask();
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public final void collect(T t) {
            synchronized (this.checkpointLock) {
                processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    scheduleNextIdleDetectionTask();
                }
                processAndCollect(t);
            }
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public final void collectWithTimestamp(T t, long j) {
            synchronized (this.checkpointLock) {
                processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                if (this.nextCheck != null) {
                    this.failOnNextCheck = false;
                } else {
                    scheduleNextIdleDetectionTask();
                }
                processAndCollectWithTimestamp(t, j);
            }
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public final void emitWatermark(Watermark watermark) {
            if (allowWatermark(watermark)) {
                synchronized (this.checkpointLock) {
                    processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);
                    if (this.nextCheck != null) {
                        this.failOnNextCheck = false;
                    } else {
                        scheduleNextIdleDetectionTask();
                    }
                    processAndEmitWatermark(watermark);
                }
            }
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public final void markAsTemporarilyIdle() {
            synchronized (this.checkpointLock) {
                processAndEmitWatermarkStatus(WatermarkStatus.IDLE);
            }
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        @Override // org.apache.flink.streaming.api.functions.source.legacy.SourceFunction.SourceContext
        public void close() {
            cancelNextIdleDetectionTask();
        }

        private void scheduleNextIdleDetectionTask() {
            if (this.idleTimeout != -1) {
                this.failOnNextCheck = true;
                this.nextCheck = this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.idleTimeout, new IdlenessDetectionTask());
            }
        }

        protected void cancelNextIdleDetectionTask() {
            ScheduledFuture<?> scheduledFuture = this.nextCheck;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        }

        protected abstract void processAndCollect(T t);

        protected abstract void processAndCollectWithTimestamp(T t, long j);

        protected abstract boolean allowWatermark(Watermark watermark);

        protected abstract void processAndEmitWatermark(Watermark watermark);

        protected abstract void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus);
    }

    public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService, Object obj, Output<StreamRecord<OUT>> output, long j, long j2, boolean z) {
        return new SwitchingOnClose(new ManualWatermarkContext(output, processingTimeService, obj, j2, z));
    }
}
