/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Map;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class BulkWriterTest {
    @TempDir
    private static Path tempFolder;

    @Test
    void testCustomBulkWriter() throws Exception {
        File outDir = TempDirUtils.newFolder((Path)tempFolder);
        try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createTestSinkWithBulkEncoder(outDir, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), (BucketFactory<Tuple2<String, Integer>, String>)new DefaultBucketFactoryImpl());){
            this.testPartFilesWithStringBucketer(testHarness, outDir, ".part-0-0.inprogress", ".part-0-1.inprogress");
        }
    }

    @Test
    void testCustomBulkWriterWithBucketAssigner() throws Exception {
        File outDir = TempDirUtils.newFolder((Path)tempFolder);
        try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createTestSinkWithCustomizedBulkEncoder(outDir, 1, 0, 10L, new TestUtils.TupleToIntegerBucketer(), new TestBulkWriterFactory(), new DefaultBucketFactoryImpl());){
            this.testPartFilesWithIntegerBucketer(testHarness, outDir, ".part-0-0.inprogress", ".part-0-1.inprogress", ".part-0-2.inprogress");
        }
    }

    @Test
    void testCustomBulkWriterWithPartConfig() throws Exception {
        File outDir = TempDirUtils.newFolder((Path)tempFolder);
        try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createTestSinkWithBulkEncoder(outDir, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), (BucketFactory<Tuple2<String, Integer>, String>)new DefaultBucketFactoryImpl(), OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build());){
            this.testPartFilesWithStringBucketer(testHarness, outDir, ".prefix-0-0.ext.inprogress", ".prefix-0-1.ext.inprogress");
        }
    }

    private void testPartFilesWithStringBucketer(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness, File outDir, String partFileName1, String partFileName2) throws Exception {
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)1), 1L));
        TestUtils.checkLocalFs(outDir, 1, 0);
        testHarness.snapshot(1L, 1L);
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)2), 2L));
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)3), 3L));
        testHarness.snapshot(2L, 2L);
        TestUtils.checkLocalFs(outDir, 2, 0);
        Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
        int fileCounter = 0;
        for (Map.Entry<File, String> fileContents : contents.entrySet()) {
            if (fileContents.getKey().getName().contains(partFileName1)) {
                ++fileCounter;
                Assertions.assertThat((String)fileContents.getValue()).isEqualTo("test1@1\n");
            } else if (fileContents.getKey().getName().contains(partFileName2)) {
                ++fileCounter;
                Assertions.assertThat((String)fileContents.getValue()).isEqualTo("test1@2\ntest1@3\n");
            }
            Assertions.assertThat((String)fileContents.getKey().getParentFile().getName()).isEqualTo("test1");
        }
        Assertions.assertThat((int)fileCounter).isEqualTo(2L);
        testHarness.notifyOfCompletedCheckpoint(2L);
        TestUtils.checkLocalFs(outDir, 0, 2);
    }

    private void testPartFilesWithIntegerBucketer(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness, File outDir, String partFileName1, String partFileName2, String partFileName3) throws Exception {
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)1), 1L));
        TestUtils.checkLocalFs(outDir, 1, 0);
        testHarness.snapshot(1L, 1L);
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)2), 2L));
        testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)3), 3L));
        testHarness.snapshot(2L, 2L);
        TestUtils.checkLocalFs(outDir, 3, 0);
        Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
        int fileCounter = 0;
        for (Map.Entry<File, String> fileContents : contents.entrySet()) {
            if (fileContents.getKey().getName().contains(partFileName1)) {
                ++fileCounter;
                Assertions.assertThat((String)fileContents.getValue()).isEqualTo("test1@1\n");
                Assertions.assertThat((String)fileContents.getKey().getParentFile().getName()).isEqualTo("1");
                continue;
            }
            if (fileContents.getKey().getName().contains(partFileName2)) {
                ++fileCounter;
                Assertions.assertThat((String)fileContents.getValue()).isEqualTo("test1@2\n");
                Assertions.assertThat((String)fileContents.getKey().getParentFile().getName()).isEqualTo("2");
                continue;
            }
            if (!fileContents.getKey().getName().contains(partFileName3)) continue;
            ++fileCounter;
            Assertions.assertThat((String)fileContents.getValue()).isEqualTo("test1@3\n");
            Assertions.assertThat((String)fileContents.getKey().getParentFile().getName()).isEqualTo("3");
        }
        Assertions.assertThat((int)fileCounter).isEqualTo(3L);
        testHarness.notifyOfCompletedCheckpoint(2L);
        TestUtils.checkLocalFs(outDir, 0, 3);
    }

    public static final class TestBulkWriterFactory
    implements BulkWriter.Factory<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream out) {
            return new TestBulkWriter(out);
        }
    }

    private static class TestBulkWriter
    implements BulkWriter<Tuple2<String, Integer>> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        private final FSDataOutputStream stream;

        TestBulkWriter(FSDataOutputStream stream) {
            this.stream = (FSDataOutputStream)Preconditions.checkNotNull((Object)stream);
        }

        public void addElement(Tuple2<String, Integer> element) throws IOException {
            this.stream.write(((String)element.f0 + "@" + element.f1 + "\n").getBytes(CHARSET));
        }

        public void flush() throws IOException {
            this.stream.flush();
        }

        public void finish() throws IOException {
            this.flush();
        }
    }
}

