package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.class */
class BucketTest {

    @TempDir
    private static Path tempFolder;
    private static final String bucketId = "testing-bucket";
    private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
    private static final Encoder ENCODER = new SimpleStringEncoder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest$BaseStubWriter.class */
    public static class BaseStubWriter extends NoOpRecoverableWriter {
        private final boolean supportsResume;
        private int supportsResumeCallCounter;
        private int recoverCallCounter;
        private int recoverForCommitCallCounter;

        private BaseStubWriter(boolean z) {
            this.supportsResumeCallCounter = 0;
            this.recoverCallCounter = 0;
            this.recoverForCommitCallCounter = 0;
            this.supportsResume = z;
        }

        int getSupportsResumeCallCounter() {
            return this.supportsResumeCallCounter;
        }

        int getRecoverCallCounter() {
            return this.recoverCallCounter;
        }

        int getRecoverForCommitCallCounter() {
            return this.recoverForCommitCallCounter;
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter
        public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
            this.recoverCallCounter++;
            return new NoOpRecoverableFsDataOutputStream();
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter
        public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable commitRecoverable) throws IOException {
            Preconditions.checkArgument(commitRecoverable instanceof NoOpRecoverable);
            this.recoverForCommitCallCounter++;
            return new NoOpCommitter();
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter
        public boolean supportsResume() {
            this.supportsResumeCallCounter++;
            return this.supportsResume;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest$StubNonResumableWriter.class */
    private static class StubNonResumableWriter extends BaseStubWriter {
        StubNonResumableWriter() {
            super(false);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest$StubResumableWriter.class */
    private static class StubResumableWriter extends BaseStubWriter {
        StubResumableWriter() {
            super(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest$TestRecoverableWriter.class */
    public static class TestRecoverableWriter extends LocalRecoverableWriter {
        private int cleanupCallCounter;

        TestRecoverableWriter(LocalFileSystem localFileSystem) {
            super(localFileSystem);
            this.cleanupCallCounter = 0;
        }

        int getCleanupCallCounter() {
            return this.cleanupCallCounter;
        }

        public boolean requiresCleanupOfRecoverableState() {
            return true;
        }

        public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
            this.cleanupCallCounter++;
            return false;
        }

        public String toString() {
            return "TestRecoverableWriter has called discardRecoverableState() " + this.cleanupCallCounter + " times.";
        }
    }

    BucketTest() {
    }

    @Test
    void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI());
        TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
        Bucket<String, String> createBucket = createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        Assertions.assertThat(createBucket.onReceptionOfCheckpoint(0L)).is(HamcrestCondition.matching(hasActiveInProgressFile()));
        createBucket.onSuccessfulCompletionOfCheckpoint(0L);
        Assertions.assertThat(recoverableWriter).is(HamcrestCondition.matching(hasCalledDiscard(0)));
    }

    @Test
    void shouldCleanupOutdatedResumablesOnCheckpointAck() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI());
        TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
        Bucket<String, String> createBucket = createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        Assertions.assertThat(createBucket.onReceptionOfCheckpoint(0L)).is(HamcrestCondition.matching(hasActiveInProgressFile()));
        createBucket.onSuccessfulCompletionOfCheckpoint(0L);
        createBucket.onReceptionOfCheckpoint(1L);
        createBucket.onReceptionOfCheckpoint(2L);
        createBucket.onSuccessfulCompletionOfCheckpoint(2L);
        Assertions.assertThat(recoverableWriter).is(HamcrestCondition.matching(hasCalledDiscard(2)));
    }

    @Test
    void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI());
        TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
        Bucket<String, String> createBucket = createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        Assertions.assertThat(createBucket.onReceptionOfCheckpoint(0L)).is(HamcrestCondition.matching(hasNoActiveInProgressFile()));
        createBucket.onReceptionOfCheckpoint(1L);
        createBucket.onReceptionOfCheckpoint(2L);
        createBucket.onSuccessfulCompletionOfCheckpoint(2L);
        Assertions.assertThat(recoverableWriter).is(HamcrestCondition.matching(hasCalledDiscard(0)));
    }

    @Test
    void shouldCleanupOutdatedResumablesAfterResumed() throws Exception {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI());
        TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
        Bucket<String, String> createBucket = createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        BucketState onReceptionOfCheckpoint = createBucket.onReceptionOfCheckpoint(0L);
        Assertions.assertThat(onReceptionOfCheckpoint).is(HamcrestCondition.matching(hasActiveInProgressFile()));
        createBucket.onSuccessfulCompletionOfCheckpoint(0L);
        Assertions.assertThat(recoverableWriter).is(HamcrestCondition.matching(hasCalledDiscard(0)));
        TestRecoverableWriter recoverableWriter2 = getRecoverableWriter(new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI()));
        Bucket<String, String> restoreBucket = restoreBucket(recoverableWriter2, 0, 0L, onReceptionOfCheckpoint, OutputFileConfig.builder().build());
        Assertions.assertThat(restoreBucket.onReceptionOfCheckpoint(1L)).is(HamcrestCondition.matching(hasActiveInProgressFile()));
        restoreBucket.onSuccessfulCompletionOfCheckpoint(1L);
        Assertions.assertThat(recoverableWriter2).is(HamcrestCondition.matching(hasCalledDiscard(1)));
    }

    @Test
    void inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws IOException {
        StubNonResumableWriter stubNonResumableWriter = new StubNonResumableWriter();
        Bucket<String, String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(stubNonResumableWriter);
        Assertions.assertThat(stubNonResumableWriter).is(HamcrestCondition.matching(hasMethodCallCountersEqualTo(1, 0, 1)));
        Assertions.assertThat(restoredBucketWithOnlyInProgressPart).is(HamcrestCondition.matching(hasNullInProgressFile(true)));
    }

    @Test
    void inProgressFileShouldBeRestoredIfWriterSupportsResume() throws IOException {
        StubResumableWriter stubResumableWriter = new StubResumableWriter();
        Bucket<String, String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(stubResumableWriter);
        Assertions.assertThat(stubResumableWriter).is(HamcrestCondition.matching(hasMethodCallCountersEqualTo(1, 1, 0)));
        Assertions.assertThat(restoredBucketWithOnlyInProgressPart).is(HamcrestCondition.matching(hasNullInProgressFile(false)));
    }

    @Test
    void pendingFilesShouldBeRestored() throws IOException {
        StubNonResumableWriter stubNonResumableWriter = new StubNonResumableWriter();
        Bucket<String, String> restoredBucketWithOnlyPendingParts = getRestoredBucketWithOnlyPendingParts(stubNonResumableWriter, 10);
        Assertions.assertThat(stubNonResumableWriter).is(HamcrestCondition.matching(hasMethodCallCountersEqualTo(0, 0, 10)));
        Assertions.assertThat(restoredBucketWithOnlyPendingParts).is(HamcrestCondition.matching(hasNullInProgressFile(true)));
    }

    private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(final int i) {
        return new TypeSafeMatcher<TestRecoverableWriter>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(TestRecoverableWriter testRecoverableWriter) {
                return testRecoverableWriter.getCleanupCallCounter() == i;
            }

            public void describeTo(Description description) {
                description.appendText("the TestRecoverableWriter to have called discardRecoverableState() ").appendValue(Integer.valueOf(i)).appendText(" times.");
            }
        };
    }

    private static TypeSafeMatcher<BucketState<String>> hasActiveInProgressFile() {
        return new TypeSafeMatcher<BucketState<String>>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketTest.2
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(BucketState<String> bucketState) {
                return bucketState.getInProgressFileRecoverable() != null;
            }

            public void describeTo(Description description) {
                description.appendText("a BucketState with active in-progress file.");
            }
        };
    }

    private static TypeSafeMatcher<BucketState<String>> hasNoActiveInProgressFile() {
        return new TypeSafeMatcher<BucketState<String>>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketTest.3
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(BucketState<String> bucketState) {
                return bucketState.getInProgressFileRecoverable() == null;
            }

            public void describeTo(Description description) {
                description.appendText("a BucketState with no active in-progress file.");
            }
        };
    }

    private static TypeSafeMatcher<Bucket<String, String>> hasNullInProgressFile(final boolean z) {
        return new TypeSafeMatcher<Bucket<String, String>>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketTest.4
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(Bucket<String, String> bucket) {
                return z == (bucket.getInProgressPart() == null);
            }

            public void describeTo(Description description) {
                description.appendText("a Bucket with its inProgressPart being ").appendText(z ? " null." : " not null.");
            }
        };
    }

    private static TypeSafeMatcher<BaseStubWriter> hasMethodCallCountersEqualTo(final int i, final int i2, final int i3) {
        return new TypeSafeMatcher<BaseStubWriter>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketTest.5
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(BaseStubWriter baseStubWriter) {
                return baseStubWriter.getSupportsResumeCallCounter() == i && baseStubWriter.getRecoverCallCounter() == i2 && baseStubWriter.getRecoverForCommitCallCounter() == i3;
            }

            public void describeTo(Description description) {
                description.appendText("a Writer where:").appendText(" supportsResume was called ").appendValue(Integer.valueOf(i)).appendText(" times,").appendText(" recover was called ").appendValue(Integer.valueOf(i2)).appendText(" times,").appendText(" and recoverForCommit was called ").appendValue(Integer.valueOf(i3)).appendText(" times.").appendText("'");
            }
        };
    }

    private static Bucket<String, String> createBucket(RecoverableWriter recoverableWriter, org.apache.flink.core.fs.Path path, int i, int i2, OutputFileConfig outputFileConfig) throws IOException {
        return Bucket.getNew(i, bucketId, path, i2, new RowWiseBucketWriter(recoverableWriter, ENCODER), rollingPolicy, (FileLifeCycleListener) null, outputFileConfig);
    }

    private static Bucket<String, String> restoreBucket(RecoverableWriter recoverableWriter, int i, long j, BucketState<String> bucketState, OutputFileConfig outputFileConfig) throws Exception {
        return Bucket.restore(i, j, new RowWiseBucketWriter(recoverableWriter, ENCODER), rollingPolicy, bucketState, (FileLifeCycleListener) null, outputFileConfig);
    }

    private static TestRecoverableWriter getRecoverableWriter(org.apache.flink.core.fs.Path path) throws IOException {
        LocalFileSystem localFileSystem = FileSystem.get(path.toUri());
        Assertions.assertThat(localFileSystem).as("Expected Local FS but got a " + localFileSystem.getClass().getName() + " for path: " + path, new Object[0]).isInstanceOf(LocalFileSystem.class);
        return new TestRecoverableWriter(localFileSystem);
    }

    private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(BaseStubWriter baseStubWriter) throws IOException {
        return Bucket.restore(0, 1L, new RowWiseBucketWriter(baseStubWriter, ENCODER), rollingPolicy, new BucketState("test", new org.apache.flink.core.fs.Path(), 12345L, new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()), new HashMap()), (FileLifeCycleListener) null, OutputFileConfig.builder().build());
    }

    private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(BaseStubWriter baseStubWriter, int i) throws IOException {
        return Bucket.restore(0, 1L, new RowWiseBucketWriter(baseStubWriter, ENCODER), rollingPolicy, new BucketState("test", new org.apache.flink.core.fs.Path(), 12345L, (InProgressFileWriter.InProgressFileRecoverable) null, createPendingPartsPerCheckpoint(i)), (FileLifeCycleListener) null, OutputFileConfig.builder().build());
    }

    private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
            hashMap.put(Long.valueOf(i2), arrayList);
        }
        return hashMap;
    }
}
