/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.function.IntSupplier;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

abstract class CommitterOperatorTestBase {
    CommitterOperatorTestBase() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
        SinkAndCounters sinkAndCounters = withPostCommitTopology ? this.sinkWithPostCommit() : this.sinkWithoutPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory(sinkAndCounters.sink, false, true));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", 1L, 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
        if (withPostCommitTopology) {
            ListAssert records = (ListAssert)Assertions.assertThat(testHarness.extractOutputValues()).hasSize(2);
            ((CommittableSummaryAssert)records.element(0, Assertions.as(SinkV2Assertions.committableSummary()))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
            ((CommittableWithLineageAssert)records.element(1, Assertions.as(SinkV2Assertions.committableWithLineage()))).isEqualTo(committableWithLineage.withSubtaskId(0));
        } else {
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
        }
        testHarness.close();
    }

    @Test
    void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(sinkAndCounters.sink, false, true, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 2, 2, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", 1L, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1L)).hasMessageContaining("Trying to commit incomplete batch of committables");
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isZero();
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", 1L, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1L)).doesNotThrowAnyException();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
        ListAssert records = (ListAssert)Assertions.assertThat(testHarness.extractOutputValues()).hasSize(3);
        ((CommittableSummaryAssert)records.element(0, Assertions.as(SinkV2Assertions.committableSummary()))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        ((CommittableWithLineageAssert)records.element(1, Assertions.as(SinkV2Assertions.committableWithLineage()))).isEqualTo(first.withSubtaskId(0));
        ((CommittableWithLineageAssert)records.element(2, Assertions.as(SinkV2Assertions.committableWithLineage()))).isEqualTo(second.withSubtaskId(0));
        testHarness.close();
    }

    @ParameterizedTest
    @CsvSource(value={"1, 10, 9", "2, 1, 0", "2, 2, 1"})
    void testStateRestoreWithScaling(int parallelismBeforeScaling, int parallelismAfterScaling, int subtaskIdAfterRecovery) throws Exception {
        boolean originalSubtaskId = false;
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(this.sinkWithPostCommitWithRetry().sink, false, true, parallelismBeforeScaling, parallelismBeforeScaling, 0);
        testHarness.open();
        long checkpointId = 0L;
        CommittableSummary committableSummary = new CommittableSummary(0, parallelismBeforeScaling, checkpointId, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", checkpointId, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        CommittableSummary committableSummary2 = new CommittableSummary(1, parallelismBeforeScaling, checkpointId, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", checkpointId, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.close();
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> restored = this.createTestHarness(sinkAndCounters.sink, false, true, parallelismAfterScaling, parallelismAfterScaling, subtaskIdAfterRecovery);
        restored.initializeState(snapshot);
        restored.open();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
        ListAssert records = (ListAssert)Assertions.assertThat(restored.extractOutputValues()).hasSize(3);
        ((CommittableSummaryAssert)records.element(0, Assertions.as(SinkV2Assertions.committableSummary()))).hasCheckpointId(checkpointId).hasSubtaskId(subtaskIdAfterRecovery).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0).hasNumberOfSubtasks(Math.min(parallelismBeforeScaling, parallelismAfterScaling));
        ((CommittableWithLineageAssert)records.element(1, Assertions.as(SinkV2Assertions.committableWithLineage()))).hasCheckpointId(checkpointId).hasSubtaskId(subtaskIdAfterRecovery).hasCommittable(first.getCommittable());
        ((CommittableWithLineageAssert)records.element(2, Assertions.as(SinkV2Assertions.committableWithLineage()))).hasCheckpointId(checkpointId).hasSubtaskId(subtaskIdAfterRecovery).hasCommittable(second.getCommittable());
        restored.close();
    }

    @ParameterizedTest
    @ValueSource(ints={0, 1})
    void testNumberOfRetries(int numRetries) throws Exception {
        try (OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(this.sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0);){
            testHarness.getStreamConfig().getConfiguration().set(SinkOptions.COMMITTER_RETRIES, (Object)numRetries);
            testHarness.open();
            long ckdId = 1L;
            testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)new CommittableSummary(0, 1, ckdId, 1, 0, 0)));
            testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)new CommittableWithLineage((Object)"1", ckdId, 0)));
            AbstractThrowableAssert throwableAssert = Assertions.assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId));
            if (numRetries == 0) {
                throwableAssert.hasMessageContaining("Failed to commit 1 committables");
            } else {
                throwableAssert.doesNotThrowAnyException();
            }
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory(sinkAndCounters.sink, false, isCheckpointingEnabled));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", 1L, 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        ListAssert records = (ListAssert)Assertions.assertThat(testHarness.extractOutputValues()).hasSize(2);
        ((CommittableSummaryAssert)records.element(0, Assertions.as(SinkV2Assertions.committableSummary()))).hasCheckpointId(1L).hasPendingCommittables(0).hasOverallCommittables(1).hasFailedCommittables(0);
        ((CommittableWithLineageAssert)records.element(1, Assertions.as(SinkV2Assertions.committableWithLineage()))).isEqualTo(committableWithLineage.withSubtaskId(0));
        testHarness.notifyOfCompletedCheckpoint(2L);
        testHarness.endInput();
        Assertions.assertThat(testHarness.getOutput()).hasSize(2);
    }

    @Test
    void testEmitCommittablesBatch() throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithoutPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory(sinkAndCounters.sink, true, false));
        testHarness.open();
        long checkpointId = Long.MAX_VALUE;
        CommittableSummary committableSummary = new CommittableSummary(1, 1, checkpointId, 1, 0, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", checkpointId, 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.endInput();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.close();
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> sink, boolean isBatchMode, boolean isCheckpointingEnabled, int maxParallelism, int parallelism, int subtaskId) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>>((OneInputStreamOperatorFactory<CommittableMessage<String>, CommittableMessage<String>>)new CommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled), maxParallelism, parallelism, subtaskId);
    }

    abstract SinkAndCounters sinkWithPostCommit();

    abstract SinkAndCounters sinkWithPostCommitWithRetry();

    abstract SinkAndCounters sinkWithoutPostCommit();

    static class SinkAndCounters {
        SupportsCommitter<String> sink;
        IntSupplier commitCounter;

        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) {
            this.sink = sink;
            this.commitCounter = commitCounter;
        }
    }
}

