/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.ArrayList;
import java.util.Random;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteResultUtil;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ChannelStateWriteRequestDispatcherImplTest {
    private static final JobID JOB_ID = new JobID();
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private static final int SUBTASK_INDEX = 0;

    ChannelStateWriteRequestDispatcherImplTest() {
    }

    @Test
    void testPartialInputChannelStateWrite() throws Exception {
        this.testBuffersRecycled(buffers -> ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)1L, (InputChannelInfo)new InputChannelInfo(1, 2), (CloseableIterator)CloseableIterator.ofElements(Buffer::recycleBuffer, (Object[])buffers)));
    }

    @Test
    void testPartialResultSubpartitionStateWrite() throws Exception {
        this.testBuffersRecycled(buffers -> ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)1L, (ResultSubpartitionInfo)new ResultSubpartitionInfo(1, 2), (Buffer[])buffers));
    }

    private void testBuffersRecycled(Function<NetworkBuffer[], ChannelStateWriteRequest> requestBuilder) throws Exception {
        ChannelStateWriteRequestDispatcherImpl dispatcher = new ChannelStateWriteRequestDispatcherImpl(() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        dispatcher.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)JOB_VERTEX_ID, (int)0));
        dispatcher.dispatch(ChannelStateWriteRequest.start((JobVertexID)JOB_VERTEX_ID, (int)0, (long)1L, (ChannelStateWriter.ChannelStateWriteResult)result, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        result.getResultSubpartitionStateHandles().completeExceptionally(new TestException());
        result.getInputChannelStateHandles().completeExceptionally(new TestException());
        NetworkBuffer[] buffers = new NetworkBuffer[]{this.buffer(), this.buffer()};
        dispatcher.dispatch(requestBuilder.apply(buffers));
        for (NetworkBuffer buffer : buffers) {
            Assertions.assertThat((boolean)buffer.isRecycled()).isTrue();
        }
    }

    @Test
    void testStartNewCheckpointForSameSubtask() throws Exception {
        this.testStartNewCheckpointAndCheckOldCheckpointResult(false);
    }

    @Test
    void testStartNewCheckpointForDifferentSubtask() throws Exception {
        this.testStartNewCheckpointAndCheckOldCheckpointResult(true);
    }

    private void testStartNewCheckpointAndCheckOldCheckpointResult(boolean isDifferentSubtask) throws Exception {
        ChannelStateWriteRequestDispatcherImpl processor = new ChannelStateWriteRequestDispatcherImpl(() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        processor.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)JOB_VERTEX_ID, (int)0));
        JobVertexID newJobVertex = JOB_VERTEX_ID;
        if (isDifferentSubtask) {
            newJobVertex = new JobVertexID();
            processor.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)newJobVertex, (int)0));
        }
        processor.dispatch(ChannelStateWriteRequest.start((JobVertexID)JOB_VERTEX_ID, (int)0, (long)1L, (ChannelStateWriter.ChannelStateWriteResult)result, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        Assertions.assertThat((boolean)result.isDone()).isFalse();
        processor.dispatch(ChannelStateWriteRequest.start((JobVertexID)newJobVertex, (int)0, (long)2L, (ChannelStateWriter.ChannelStateWriteResult)new ChannelStateWriter.ChannelStateWriteResult(), (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        ChannelStateWriteResultUtil.assertCheckpointFailureReason(result, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
    }

    @Test
    void testStartOldCheckpointForSameSubtask() throws Exception {
        this.testStartOldCheckpointAfterNewCheckpointAborted(false);
    }

    @Test
    void testStartOldCheckpointForDifferentSubtask() throws Exception {
        this.testStartOldCheckpointAfterNewCheckpointAborted(true);
    }

    private void testStartOldCheckpointAfterNewCheckpointAborted(boolean isDifferentSubtask) throws Exception {
        ChannelStateWriteRequestDispatcherImpl processor = new ChannelStateWriteRequestDispatcherImpl(() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        processor.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)JOB_VERTEX_ID, (int)0));
        JobVertexID newJobVertex = JOB_VERTEX_ID;
        if (isDifferentSubtask) {
            newJobVertex = new JobVertexID();
            processor.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)newJobVertex, (int)0));
        }
        processor.dispatch(ChannelStateWriteRequest.abort((JobVertexID)JOB_VERTEX_ID, (int)0, (long)2L, (Throwable)new TestException()));
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        processor.dispatch(ChannelStateWriteRequest.start((JobVertexID)newJobVertex, (int)0, (long)1L, (ChannelStateWriter.ChannelStateWriteResult)result, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        ChannelStateWriteResultUtil.assertCheckpointFailureReason(result, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
    }

    @Test
    void testAbortCheckpointAndCheckAllException() throws Exception {
        this.testAbortCheckpointAndCheckAllException(1);
        this.testAbortCheckpointAndCheckAllException(2);
        this.testAbortCheckpointAndCheckAllException(3);
        this.testAbortCheckpointAndCheckAllException(5);
        this.testAbortCheckpointAndCheckAllException(10);
    }

    private void testAbortCheckpointAndCheckAllException(int numberOfSubtask) throws Exception {
        ChannelStateWriter.ChannelStateWriteResult result;
        int i;
        ChannelStateWriteRequestDispatcherImpl processor = new ChannelStateWriteRequestDispatcherImpl(() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        ArrayList<ChannelStateWriter.ChannelStateWriteResult> results = new ArrayList<ChannelStateWriter.ChannelStateWriteResult>(numberOfSubtask);
        for (int i2 = 0; i2 < numberOfSubtask; ++i2) {
            processor.dispatch(ChannelStateWriteRequest.registerSubtask((JobVertexID)JOB_VERTEX_ID, (int)i2));
        }
        long checkpointId = 1L;
        int abortedSubtaskIndex = new Random().nextInt(numberOfSubtask);
        processor.dispatch(ChannelStateWriteRequest.abort((JobVertexID)JOB_VERTEX_ID, (int)abortedSubtaskIndex, (long)checkpointId, (Throwable)new TestException()));
        for (i = 0; i < numberOfSubtask; ++i) {
            result = new ChannelStateWriter.ChannelStateWriteResult();
            results.add(result);
            processor.dispatch(ChannelStateWriteRequest.start((JobVertexID)JOB_VERTEX_ID, (int)i, (long)checkpointId, (ChannelStateWriter.ChannelStateWriteResult)result, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        }
        Assertions.assertThat(results).allMatch(ChannelStateWriter.ChannelStateWriteResult::isDone);
        for (i = 0; i < numberOfSubtask; ++i) {
            result = (ChannelStateWriter.ChannelStateWriteResult)results.get(i);
            if (i == abortedSubtaskIndex) {
                ChannelStateWriteResultUtil.assertHasSpecialCause(result, TestException.class);
                continue;
            }
            ChannelStateWriteResultUtil.assertCheckpointFailureReason(result, CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION);
        }
    }

    private NetworkBuffer buffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)10), FreeingBufferRecycler.INSTANCE);
    }
}

