/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class BucketStateSerializerTest {
    private static final int CURRENT_VERSION = 2;
    @Parameter
    private Integer previousVersion;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final String BUCKET_ID = "test-bucket";
    private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("bucket-state-migration-test");
    private final BucketStateGenerator generator = new BucketStateGenerator("test-bucket", "writing", "wrote", BASE_PATH, 2);

    BucketStateSerializerTest() {
    }

    @Parameters(name="Previous Version = {0}")
    private static Collection<Integer> previousVersions() {
        return Arrays.asList(1, 2);
    }

    @TestTemplate
    @Disabled
    void prepareDeserializationEmpty() throws IOException {
        this.generator.prepareDeserializationEmpty();
    }

    @TestTemplate
    void testSerializationEmpty() throws IOException {
        String scenarioName = "empty";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion);
        java.nio.file.Path outputPath = pathResolver.getOutputPath("empty");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        BucketState<String> recoveredState = BucketStateSerializerTest.readBucketState("empty", this.previousVersion);
        Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(0, recoveredState);
        Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
        Assertions.assertThat((Object)bucket.getInProgressPart()).isNull();
        Assertions.assertThat((Map)bucket.getPendingFileRecoverablesPerCheckpoint()).isEmpty();
    }

    @TestTemplate
    @Disabled
    void prepareDeserializationOnlyInProgress() throws IOException {
        this.generator.prepareDeserializationOnlyInProgress();
    }

    @TestTemplate
    void testSerializationOnlyInProgress() throws IOException {
        String scenarioName = "only-in-progress";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion);
        java.nio.file.Path outputPath = pathResolver.getOutputPath("only-in-progress");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        BucketState<String> recoveredState = BucketStateSerializerTest.readBucketState("only-in-progress", this.previousVersion);
        Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(0, recoveredState);
        Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
        Assertions.assertThat((long)bucket.getInProgressPart().getSize()).isEqualTo(8L);
        long numFiles = Files.list(Paths.get(testBucketPath.toString(), new String[0])).map(file -> {
            Assertions.assertThat((String)file.getFileName().toString()).startsWith((CharSequence)".part-0-0.inprogress");
            return 1;
        }).count();
        Assertions.assertThat((long)numFiles).isOne();
    }

    @TestTemplate
    @Disabled
    void prepareDeserializationFull() throws IOException {
        this.generator.prepareDeserializationFull();
    }

    @TestTemplate
    void testSerializationFull() throws IOException {
        this.testDeserializationFull(true, "full");
    }

    @TestTemplate
    @Disabled
    public void prepareDeserializationNullInProgress() throws IOException {
        this.generator.prepareDeserializationNullInProgress();
    }

    @TestTemplate
    void testSerializationNullInProgress() throws IOException {
        this.testDeserializationFull(false, "full-no-in-progress");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDeserializationFull(boolean withInProgress, String scenarioName) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion);
        try {
            java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName);
            Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
            BucketState<String> recoveredState = BucketStateSerializerTest.readBucketStateFromTemplate(scenarioName, this.previousVersion);
            int noOfPendingCheckpoints = 5;
            Map pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
            Assertions.assertThat((Map)pendingFileRecoverables).hasSize(5);
            Set beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = ".part-0-" + i + ".inprogress";
                Assertions.assertThat(beforeRestorePaths).anyMatch(item -> item.startsWith(part));
            }
            Bucket<String, String> bucket = BucketStateSerializerTest.restoreBucket(6, recoveredState);
            Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
            Assertions.assertThat((List)bucket.getPendingFileRecoverablesForCurrentCheckpoint()).isEmpty();
            Set afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = "part-0-" + i;
                Assertions.assertThat(afterRestorePaths).contains((Object[])new String[]{part});
                afterRestorePaths.remove(part);
            }
            if (withInProgress) {
                Assertions.assertThat(afterRestorePaths).hasSize(1);
                Assertions.assertThat(afterRestorePaths).anyMatch(item -> item.startsWith(".part-0-5.inprogress"));
            } else {
                Assertions.assertThat(afterRestorePaths).isEmpty();
            }
        }
        finally {
            FileUtils.deleteDirectory((File)pathResolver.getResourcePath(scenarioName).toFile());
        }
    }

    private static Bucket<String, String> restoreBucket(int initialPartCounter, BucketState<String> bucketState) throws IOException {
        return Bucket.restore((int)0, (long)initialPartCounter, BucketStateSerializerTest.createBucketWriter(), (RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10L)).build(), bucketState, null, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter(FileSystem.getLocalFileSystem().createRecoverableWriter(), (Encoder)new SimpleStringEncoder());
    }

    private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
        RowWiseBucketWriter<String, String> bucketWriter = BucketStateSerializerTest.createBucketWriter();
        return new BucketStateSerializer(bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer(), (SimpleVersionedSerializer)SimpleVersionedStringSerializer.INSTANCE);
    }

    private static BucketState<String> readBucketState(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        byte[] bytes = Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName));
        return (BucketState)SimpleVersionedSerialization.readVersionAndDeSerialize(BucketStateSerializerTest.bucketStateSerializer(), (byte[])bytes);
    }

    private static BucketState<String> readBucketStateFromTemplate(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        FileUtils.copy((Path)new Path(scenarioPath.toString() + "-template"), (Path)new Path(scenarioPath.toString()), (boolean)false);
        return BucketStateSerializerTest.readBucketState(scenarioName, version);
    }
}

