package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.class */
class SubtaskCheckpointCoordinatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$BlockingRunnableFuture.class */
    private static final class BlockingRunnableFuture implements RunnableFuture<SnapshotResult<KeyedStateHandle>> {
        private final CompletableFuture<SnapshotResult<KeyedStateHandle>> future;
        private final OneShotLatch signalRunLatch;
        private final CountDownLatch countDownLatch;
        private final SnapshotResult<KeyedStateHandle> value;

        private BlockingRunnableFuture() {
            this.future = new CompletableFuture<>();
            this.signalRunLatch = new OneShotLatch();
            this.countDownLatch = new CountDownLatch(2);
            this.value = SnapshotResult.empty();
        }

        @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
        public void run() {
            this.signalRunLatch.trigger();
            this.countDownLatch.countDown();
            try {
                this.countDownLatch.await();
            } catch (InterruptedException e) {
                ExceptionUtils.rethrow(e);
            }
            this.future.complete(this.value);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            this.future.cancel(z);
            return true;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override // java.util.concurrent.Future
        public SnapshotResult<KeyedStateHandle> get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override // java.util.concurrent.Future
        public SnapshotResult<KeyedStateHandle> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$CheckpointOperator.class */
    private static class CheckpointOperator implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;
        private final OperatorSnapshotFutures operatorSnapshotFutures;
        private boolean checkpointed = false;

        CheckpointOperator(OperatorSnapshotFutures operatorSnapshotFutures) {
            this.operatorSnapshotFutures = operatorSnapshotFutures;
        }

        boolean isCheckpointed() {
            return this.checkpointed;
        }

        public void open() throws Exception {
        }

        public void finish() throws Exception {
        }

        public void close() throws Exception {
        }

        public void prepareSnapshotPreBarrier(long j) {
        }

        public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
            this.checkpointed = true;
            return this.operatorSnapshotFutures;
        }

        public void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        }

        public void setKeyContextElement1(StreamRecord<?> streamRecord) {
        }

        public void setKeyContextElement2(StreamRecord<?> streamRecord) {
        }

        public OperatorMetricGroup getMetricGroup() {
            return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }

        public void notifyCheckpointComplete(long j) {
        }

        public void notifyCheckpointAborted(long j) {
        }

        public void setCurrentKey(Object obj) {
        }

        public Object getCurrentKey() {
            return null;
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        public void processWatermark(Watermark watermark) throws Exception {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$MapOperator.class */
    private static class MapOperator extends StreamMap<String, String> {
        private static final long serialVersionUID = 1;

        public MapOperator() {
            super(str -> {
                return str;
            });
        }

        public void notifyCheckpointAborted(long j) throws Exception {
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2104837527:
                    if (implMethodName.equals("lambda$new$e0defa2f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$MapOperator") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                        return str -> {
                            return str;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    SubtaskCheckpointCoordinatorTest() {
    }

    @Test
    void testInitCheckpoint() throws IOException, CheckpointException {
        Assertions.assertThat(initCheckpoint(true, CheckpointType.CHECKPOINT)).isTrue();
        Assertions.assertThat(initCheckpoint(false, CheckpointType.CHECKPOINT)).isFalse();
        Assertions.assertThat(initCheckpoint(false, SavepointType.savepoint(SavepointFormatType.CANONICAL))).isFalse();
    }

    private boolean initCheckpoint(boolean z, SnapshotType snapshotType) throws IOException, CheckpointException {
        ChannelStateWriter channelStateWriter = new ChannelStateWriter.NoOpChannelStateWriter() { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.1MockWriter
            private boolean started;

            public void start(long j, CheckpointOptions checkpointOptions) {
                this.started = true;
            }
        };
        SubtaskCheckpointCoordinator coordinator = coordinator(channelStateWriter);
        CheckpointStorageLocationReference checkpointStorageLocationReference = CheckpointStorageLocationReference.getDefault();
        coordinator.initInputsCheckpoint(1L, z ? CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, checkpointStorageLocationReference) : CheckpointOptions.alignedNoTimeout(snapshotType, checkpointStorageLocationReference));
        return ((C1MockWriter) channelStateWriter).started;
    }

    @Test
    void testNotifyCheckpointComplete() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        Environment build = MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build();
        SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).build();
        Throwable th = null;
        try {
            try {
                OperatorChain<?, ?> operatorChain = getOperatorChain(build);
                build2.notifyCheckpointComplete(42L, operatorChain, () -> {
                    return true;
                });
                Assertions.assertThat(testTaskStateManager.getNotifiedCompletedCheckpointId()).isEqualTo(42L);
                long j = 42 + 1;
                build2.notifyCheckpointComplete(j, operatorChain, () -> {
                    return false;
                });
                Assertions.assertThat(testTaskStateManager.getNotifiedCompletedCheckpointId()).isEqualTo(j);
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testSavepointNotResultingInPriorityEvents() throws Exception {
        Environment build = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment(build).build();
        Throwable th = null;
        try {
            try {
                final AtomicReference atomicReference = new AtomicReference(null);
                build2.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(SavepointType.savepoint(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), new RegularOperatorChain(new MockStreamTaskBuilder(build).build(), new NonRecordWriter()) { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.1
                    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
                        super.broadcastEvent(abstractEvent, z);
                        atomicReference.set(Boolean.valueOf(z));
                    }
                }, false, () -> {
                    return true;
                });
                Assertions.assertThat((Boolean) atomicReference.get()).isFalse();
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testForceAlignedCheckpointResultingInPriorityEvents() throws Exception {
        Environment build = MockEnvironment.builder().build();
        final SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment(build).build();
        Throwable th = null;
        try {
            try {
                final AtomicReference atomicReference = new AtomicReference(null);
                build2.checkpointState(new CheckpointMetaData(42L, 0L), CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()).withUnalignedUnsupported(), new CheckpointMetricsBuilder(), new RegularOperatorChain(new MockStreamTaskBuilder(build).build(), new NonRecordWriter()) { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.2
                    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
                        super.broadcastEvent(abstractEvent, z);
                        atomicReference.set(Boolean.valueOf(z));
                        build2.getChannelStateWriter().addOutputData(42L, new ResultSubpartitionInfo(0, 0), 0, new Buffer[]{BufferBuilderTestUtils.buildSomeBuffer(500)});
                    }
                }, false, () -> {
                    return true;
                });
                Assertions.assertThat((Boolean) atomicReference.get()).isTrue();
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testSkipChannelStateForSavepoints() throws Exception {
        SubtaskCheckpointCoordinator build = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setPrepareInputSnapshot((channelStateWriter, l) -> {
            Assertions.fail("should not prepare input snapshot for savepoint");
            return null;
        }).build();
        Throwable th = null;
        try {
            build.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(SavepointType.savepoint(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), new RegularOperatorChain(new StreamTaskITCase.NoOpStreamTask(new DummyEnvironment()), new NonRecordWriter()), false, () -> {
                return true;
            });
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testNotifyCheckpointSubsumed() throws Exception {
        Environment build = MockEnvironment.builder().setTaskStateManager(new TestTaskStateManager()).build();
        SubtaskCheckpointCoordinatorImpl build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).setUnalignedCheckpointEnabled(true).build();
        Throwable th = null;
        try {
            try {
                OneInputStreamOperator streamMap = new StreamMap(str -> {
                    return str;
                });
                streamMap.setProcessingTimeService(new TestProcessingTimeService());
                OperatorChain operatorChain = operatorChain(streamMap);
                operatorChain.initializeStateAndOpenOperators(new StreamTaskStateInitializerImpl(build, new TestStateBackend()));
                build2.getChannelStateWriter().start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
                build2.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                    return false;
                });
                long j = 42 + 1;
                build2.notifyCheckpointSubsumed(j, operatorChain, () -> {
                    return true;
                });
                Assertions.assertThat(streamMap.getKeyedStateBackend().getSubsumeCheckpointId()).isEqualTo(j);
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testNotifyCheckpointAbortedManyTimes() throws Exception {
        Environment build = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinatorImpl build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).setMaxRecordAbortedCheckpoints(256).build();
        Throwable th = null;
        try {
            try {
                OperatorChain<?, ?> operatorChain = getOperatorChain(build);
                long j = 256 + 42;
                for (int i = 1; i < j; i++) {
                    build2.notifyCheckpointAborted(i, operatorChain, () -> {
                        return true;
                    });
                    Assertions.assertThat(build2.getAbortedCheckpointSize()).isEqualTo(Math.min(256, i));
                }
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        SubtaskCheckpointCoordinatorImpl build = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build()).setUnalignedCheckpointEnabled(true).build();
        Throwable th = null;
        try {
            try {
                CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());
                OperatorChain operatorChain = operatorChain(checkpointOperator);
                build.notifyCheckpointAborted(42L, operatorChain, () -> {
                    return true;
                });
                Assertions.assertThat(build.getAbortedCheckpointSize()).isOne();
                build.getChannelStateWriter().start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
                build.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                    return false;
                });
                Assertions.assertThat(checkpointOperator.isCheckpointed()).isFalse();
                Assertions.assertThat(testTaskStateManager.getReportedCheckpointId()).isEqualTo(-1L);
                Assertions.assertThat(build.getAbortedCheckpointSize()).isZero();
                Assertions.assertThat(build.getAsyncCheckpointRunnableSize()).isZero();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new MapOperator());
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(oneInputStreamTaskTestHarness.jobConfig, oneInputStreamTaskTestHarness.taskConfig, oneInputStreamTaskTestHarness.executionConfig, oneInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), oneInputStreamTaskTestHarness.bufferSize, oneInputStreamTaskTestHarness.taskStateManager);
        SubtaskCheckpointCoordinator build = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(streamMockEnvironment).build();
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                streamMockEnvironment.addOutput(new RecordOrEventCollectingResultPartitionWriter(arrayList, new StreamElementSerializer(StringSerializer.INSTANCE)));
                oneInputStreamTaskTestHarness.invoke(streamMockEnvironment);
                oneInputStreamTaskTestHarness.waitForTaskRunning();
                OperatorChain operatorChain = oneInputStreamTaskTestHarness.mo166getTask().operatorChain;
                build.notifyCheckpointAborted(42L, operatorChain, () -> {
                    return true;
                });
                build.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                    return false;
                });
                Assertions.assertThat(arrayList).hasSize(1);
                Object obj = arrayList.get(0);
                Assertions.assertThat(obj).isInstanceOf(CancelCheckpointMarker.class);
                Assertions.assertThat(((CancelCheckpointMarker) obj).getCheckpointId()).isEqualTo(42L);
                oneInputStreamTaskTestHarness.endInput();
                oneInputStreamTaskTestHarness.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
        SubtaskCheckpointCoordinatorImpl build = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(MockEnvironment.builder().build()).setExecutor(Executors.newFixedThreadPool(2)).setUnalignedCheckpointEnabled(true).build();
        Throwable th = null;
        try {
            try {
                BlockingRunnableFuture blockingRunnableFuture = new BlockingRunnableFuture();
                OperatorChain operatorChain = operatorChain(new CheckpointOperator(new OperatorSnapshotFutures(DoneFuture.of(SnapshotResult.empty()), blockingRunnableFuture, DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()))));
                build.getChannelStateWriter().start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
                build.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                    return false;
                });
                blockingRunnableFuture.awaitRun();
                Assertions.assertThat(build.getAsyncCheckpointRunnableSize()).isOne();
                Assertions.assertThat(blockingRunnableFuture.isCancelled()).isFalse();
                build.notifyCheckpointAborted(42L, operatorChain, () -> {
                    return true;
                });
                while (!blockingRunnableFuture.isDone()) {
                    Thread.sleep(10L);
                }
                Assertions.assertThat(blockingRunnableFuture.isCancelled()).isTrue();
                Assertions.assertThat(build.getAbortedCheckpointSize()).isZero();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        Environment build = MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build();
        SubtaskCheckpointCoordinatorImpl build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).build();
        Throwable th = null;
        try {
            try {
                OperatorChain<?, ?> operatorChain = getOperatorChain(build);
                build2.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                    return false;
                });
                build2.notifyCheckpointAborted(42L, operatorChain, () -> {
                    return true;
                });
                Assertions.assertThat(build2.getAbortedCheckpointSize()).isZero();
                Assertions.assertThat(testTaskStateManager.getNotifiedAbortedCheckpointId()).isEqualTo(42L);
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testTimeoutableAlignedBarrierNotPriorityAndChannelStateResult() throws Exception {
        SubtaskCheckpointCoordinator build = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment(MockEnvironment.builder().build()).build();
        Throwable th = null;
        try {
            try {
                final AtomicReference atomicReference = new AtomicReference(null);
                final AtomicReference atomicReference2 = new AtomicReference(null);
                RegularOperatorChain regularOperatorChain = new RegularOperatorChain(new StreamTaskITCase.NoOpStreamTask(new DummyEnvironment()), new NonRecordWriter()) { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.3
                    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
                        super.broadcastEvent(abstractEvent, z);
                        atomicReference.set(Boolean.valueOf(z));
                    }

                    public void snapshotState(Map map, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, Supplier supplier, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory checkpointStreamFactory) {
                        atomicReference2.set(channelStateWriteResult);
                        sendAcknowledgeCheckpointEvent(checkpointMetaData.getCheckpointId());
                    }
                };
                CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), CheckpointOptions.AlignmentType.ALIGNED, 200L);
                build.initInputsCheckpoint(66L, checkpointOptions);
                build.checkpointState(new CheckpointMetaData(66L, System.currentTimeMillis()), checkpointOptions, new CheckpointMetricsBuilder(), regularOperatorChain, false, () -> {
                    return true;
                });
                Assertions.assertThat((Boolean) atomicReference.get()).isFalse();
                Assertions.assertThat(atomicReference2.get()).isNotNull();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(dummyEnvironment.getJobVertexId(), "test", 0, new JobManagerCheckpointStorage(), dummyEnvironment.getChannelStateExecutorFactory(), 5);
        MockEnvironment build = MockEnvironment.builder().build();
        Throwable th = null;
        try {
            SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinatorImpl = new SubtaskCheckpointCoordinatorImpl(new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, MoreExecutors.newDirectExecutorService(), dummyEnvironment, (str, th2) -> {
            }, (channelStateWriter, l) -> {
                return CompletableFuture.completedFuture(null);
            }, 1, channelStateWriterImpl, true, (callable, duration) -> {
                return () -> {
                };
            });
            Throwable th3 = null;
            try {
                try {
                    OperatorChain<?, ?> operatorChain = getOperatorChain(build);
                    subtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(1, operatorChain, () -> {
                        return true;
                    });
                    subtaskCheckpointCoordinatorImpl.initInputsCheckpoint(1, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
                    ChannelStateWriter.ChannelStateWriteResult writeResult = channelStateWriterImpl.getWriteResult(1);
                    Assertions.assertThat(writeResult).isNotNull();
                    subtaskCheckpointCoordinatorImpl.checkpointState(new CheckpointMetaData(1, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                        return true;
                    });
                    Assertions.assertThat(channelStateWriterImpl.getWriteResult(1)).isNull();
                    writeResult.waitForDone();
                    Assertions.assertThat(writeResult.isDone()).isTrue();
                    Assertions.assertThat(writeResult.getInputChannelStateHandles().isCompletedExceptionally()).isTrue();
                    Assertions.assertThat(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally()).isTrue();
                    if (subtaskCheckpointCoordinatorImpl != null) {
                        if (0 != 0) {
                            try {
                                subtaskCheckpointCoordinatorImpl.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            subtaskCheckpointCoordinatorImpl.close();
                        }
                    }
                    if (build != null) {
                        if (0 == 0) {
                            build.close();
                            return;
                        }
                        try {
                            build.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th3 = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subtaskCheckpointCoordinatorImpl != null) {
                    if (th3 != null) {
                        try {
                            subtaskCheckpointCoordinatorImpl.close();
                        } catch (Throwable th8) {
                            th3.addSuppressed(th8);
                        }
                    } else {
                        subtaskCheckpointCoordinatorImpl.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    build.close();
                }
            }
            throw th9;
        }
    }

    @Test
    void testAbortOldAndStartNewCheckpoint() throws Exception {
        CheckpointOptions unaligned = CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
        DummyEnvironment dummyEnvironment = new DummyEnvironment();
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl(dummyEnvironment.getJobVertexId(), "test", 0, new JobManagerCheckpointStorage(), dummyEnvironment.getChannelStateExecutorFactory(), 5);
        MockEnvironment build = MockEnvironment.builder().build();
        Throwable th = null;
        try {
            SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinatorImpl = new SubtaskCheckpointCoordinatorImpl(new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, MoreExecutors.newDirectExecutorService(), dummyEnvironment, (str, th2) -> {
            }, (channelStateWriter, l) -> {
                return CompletableFuture.completedFuture(null);
            }, 1, channelStateWriterImpl, true, (callable, duration) -> {
                return () -> {
                };
            });
            Throwable th3 = null;
            try {
                try {
                    OperatorChain<?, ?> operatorChain = getOperatorChain(build);
                    subtaskCheckpointCoordinatorImpl.initInputsCheckpoint(42, unaligned);
                    ChannelStateWriter.ChannelStateWriteResult writeResult = channelStateWriterImpl.getWriteResult(42);
                    Assertions.assertThat(writeResult).isNotNull();
                    subtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(42, operatorChain, () -> {
                        return true;
                    });
                    subtaskCheckpointCoordinatorImpl.initInputsCheckpoint(43, unaligned);
                    ChannelStateWriter.ChannelStateWriteResult writeResult2 = channelStateWriterImpl.getWriteResult(43);
                    writeResult.waitForDone();
                    Assertions.assertThat(writeResult.isDone()).isTrue();
                    Assertions.assertThatThrownBy(() -> {
                    }).isInstanceOf(CancellationException.class);
                    subtaskCheckpointCoordinatorImpl.checkpointState(new CheckpointMetaData(43, System.currentTimeMillis()), unaligned, new CheckpointMetricsBuilder(), operatorChain, false, () -> {
                        return true;
                    });
                    writeResult2.waitForDone();
                    Assertions.assertThat(writeResult2).isNotNull();
                    Assertions.assertThat(writeResult2.isDone()).isTrue();
                    Assertions.assertThat(writeResult2.getInputChannelStateHandles().isCompletedExceptionally()).isFalse();
                    Assertions.assertThat(writeResult2.getResultSubpartitionStateHandles().isCompletedExceptionally()).isFalse();
                    if (subtaskCheckpointCoordinatorImpl != null) {
                        if (0 != 0) {
                            try {
                                subtaskCheckpointCoordinatorImpl.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            subtaskCheckpointCoordinatorImpl.close();
                        }
                    }
                    if (build != null) {
                        if (0 == 0) {
                            build.close();
                            return;
                        }
                        try {
                            build.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th3 = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (subtaskCheckpointCoordinatorImpl != null) {
                    if (th3 != null) {
                        try {
                            subtaskCheckpointCoordinatorImpl.close();
                        } catch (Throwable th8) {
                            th3.addSuppressed(th8);
                        }
                    } else {
                        subtaskCheckpointCoordinatorImpl.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    build.close();
                }
            }
            throw th9;
        }
    }

    private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception {
        return new RegularOperatorChain(new MockStreamTaskBuilder(mockEnvironment).build(), new NonRecordWriter());
    }

    private <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T>... oneInputStreamOperatorArr) throws Exception {
        return OperatorChainTest.setupOperatorChain(oneInputStreamOperatorArr);
    }

    private static SubtaskCheckpointCoordinator coordinator(ChannelStateWriter channelStateWriter) throws IOException {
        return new SubtaskCheckpointCoordinatorImpl(new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, MoreExecutors.newDirectExecutorService(), new DummyEnvironment(), (str, th) -> {
            Assertions.fail(str);
        }, (channelStateWriter2, l) -> {
            return CompletableFuture.completedFuture(null);
        }, 0, channelStateWriter, true, (callable, duration) -> {
            return () -> {
            };
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -529187117:
                if (implMethodName.equals("lambda$testNotifyCheckpointSubsumed$48b15570$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
