package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.TriConsumerWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerCleanupTest.class */
class BlobServerCleanupTest {
    private static final Random RANDOM = new Random();

    @TempDir
    private File temporaryFolder;

    BlobServerCleanupTest() {
    }

    private static byte[] createRandomData() {
        byte[] bArr = new byte[2000000];
        RANDOM.nextBytes(bArr);
        return bArr;
    }

    private static BlobServer createTestInstance(String str, long j) throws IOException {
        return createTestInstance(str, j, new VoidBlobStore());
    }

    private static BlobServer createTestInstance(String str, long j, BlobStore blobStore) throws IOException {
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.STORAGE_DIRECTORY, str);
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, Long.valueOf(j));
        return new BlobServer(configuration, new File(str), blobStore);
    }

    @Test
    void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException {
        testTransientBlobCleanup(null);
    }

    @Test
    void testTransientBlobForJobCleanup() throws IOException, InterruptedException, ExecutionException {
        testTransientBlobCleanup(new JobID());
    }

    private void testTransientBlobCleanup(@Nullable JobID jobID) throws IOException, InterruptedException, ExecutionException {
        ArrayList arrayList = new ArrayList(3);
        byte[] createRandomData = createRandomData();
        byte[] createRandomData2 = createRandomData();
        BlobServer createTestInstance = createTestInstance(this.temporaryFolder.getAbsolutePath(), 1L);
        Throwable th = null;
        try {
            try {
                ConcurrentMap blobExpiryTimes = createTestInstance.getBlobExpiryTimes();
                createTestInstance.start();
                long currentTimeMillis = System.currentTimeMillis() + 1;
                BlobKey blobKey = (TransientBlobKey) BlobServerPutTest.put((BlobService) createTestInstance, jobID, createRandomData, BlobKey.BlobType.TRANSIENT_BLOB);
                Long l = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey));
                Assertions.assertThat(l).isGreaterThanOrEqualTo(currentTimeMillis);
                long currentTimeMillis2 = System.currentTimeMillis() + 1;
                BlobKey blobKey2 = (TransientBlobKey) BlobServerPutTest.put((BlobService) createTestInstance, jobID, createRandomData2, BlobKey.BlobType.TRANSIENT_BLOB);
                Long l2 = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2));
                Assertions.assertThat(l2).isGreaterThanOrEqualTo(currentTimeMillis2);
                JobID jobID2 = jobID == null ? new JobID() : jobID;
                BlobKey put = BlobServerPutTest.put((BlobService) createTestInstance, jobID2, createRandomData, BlobKey.BlobType.PERMANENT_BLOB);
                Thread.sleep(1L);
                long currentTimeMillis3 = System.currentTimeMillis() + 1;
                BlobServerPutTest.verifyContents((BlobService) createTestInstance, jobID, blobKey, createRandomData);
                Long l3 = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey));
                Assertions.assertThat(l3).isGreaterThan(l);
                Assertions.assertThat(l3).isGreaterThanOrEqualTo(currentTimeMillis3);
                Assertions.assertThat(l2).isEqualTo(blobExpiryTimes.get(Tuple2.of(jobID, blobKey2)));
                Thread.sleep(1L);
                long currentTimeMillis4 = System.currentTimeMillis() + 1;
                BlobServerPutTest.verifyContents((BlobService) createTestInstance, jobID, blobKey2, createRandomData2);
                Assertions.assertThat(l3).isEqualTo(blobExpiryTimes.get(Tuple2.of(jobID, blobKey)));
                Assertions.assertThat((Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2))).isGreaterThan(l2);
                Assertions.assertThat((Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2))).isGreaterThanOrEqualTo(currentTimeMillis4);
                long currentTimeMillis5 = System.currentTimeMillis() + (3 * 1);
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
                for (int i = 0; i < 3; i++) {
                    arrayList.add(CompletableFuture.supplyAsync(() -> {
                        while (System.currentTimeMillis() < currentTimeMillis5) {
                            try {
                                BlobServerGetTest.get(createTestInstance, jobID, blobKey);
                            } catch (IOException e) {
                                throw new CompletionException((Throwable) new FlinkException("Could not retrieve blob.", e));
                            }
                        }
                        return null;
                    }, newFixedThreadPool));
                }
                FutureUtils.combineAll(arrayList).get();
                BlobCachePutTest.verifyDeletedEventually(createTestInstance, jobID, blobKey, blobKey2);
                BlobServerPutTest.verifyContents((BlobService) createTestInstance, jobID2, put, createRandomData);
                if (createTestInstance != null) {
                    if (0 == 0) {
                        createTestInstance.close();
                        return;
                    }
                    try {
                        createTestInstance.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestInstance != null) {
                if (th != null) {
                    try {
                        createTestInstance.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestInstance.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testLocalCleanup() throws Exception {
        testSuccessfulCleanup(new JobID(), (blobServer, jobID, executor) -> {
        }, createTestingBlobStoreBuilder().setDeleteAllFunction(jobID2 -> {
            return (Boolean) Assertions.fail("No deleteAll call is expected to be triggered but was for %s.", new Object[]{jobID2});
        }).createTestingBlobStore());
    }

    @Test
    void testGlobalCleanup() throws Exception {
        HashSet hashSet = new HashSet();
        JobID jobID = new JobID();
        testSuccessfulCleanup(jobID, (blobServer, jobID2, executor) -> {
        }, createTestingBlobStoreBuilder().setDeleteAllFunction(jobID3 -> {
            hashSet.add(jobID3);
            return true;
        }).createTestingBlobStore());
        Assertions.assertThat(hashSet).containsExactlyInAnyOrder(new JobID[]{jobID});
    }

    @Test
    void testGlobalCleanupUnsuccessfulInBlobStore() throws Exception {
        testFailedCleanup(new JobID(), (blobServer, jobID, executor) -> {
            Assertions.assertThatThrownBy(() -> {
            }).isInstanceOf(ExecutionException.class).hasCauseInstanceOf(IOException.class);
        }, createTestingBlobStoreBuilder().setDeleteAllFunction(jobID2 -> {
            return false;
        }).createTestingBlobStore());
    }

    @Test
    void testGlobalCleanupFailureInBlobStore() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Expected RuntimeException");
        testFailedCleanup(new JobID(), (blobServer, jobID, executor) -> {
            Assertions.assertThatThrownBy(() -> {
            }).isInstanceOf(ExecutionException.class).hasCause(runtimeException);
        }, createTestingBlobStoreBuilder().setDeleteAllFunction(jobID2 -> {
            throw runtimeException;
        }).createTestingBlobStore());
    }

    private TestingBlobStoreBuilder createTestingBlobStoreBuilder() {
        return new TestingBlobStoreBuilder().setDeleteFunction((jobID, blobKey) -> {
            throw new UnsupportedOperationException("Deletion of individual blobs is not supported.");
        });
    }

    private void testFailedCleanup(JobID jobID, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> triConsumerWithException, BlobStore blobStore) throws Exception {
        testCleanup(jobID, triConsumerWithException, blobStore, 2);
    }

    private void testSuccessfulCleanup(JobID jobID, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> triConsumerWithException, BlobStore blobStore) throws Exception {
        testCleanup(jobID, triConsumerWithException, blobStore, 0);
    }

    private void testCleanup(JobID jobID, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> triConsumerWithException, BlobStore blobStore, int i) throws Exception {
        JobID jobID2 = new JobID();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            BlobServer createTestInstance = createTestInstance(this.temporaryFolder.getAbsolutePath(), 2147483647L, blobStore);
            Throwable th = null;
            try {
                try {
                    createTestInstance.start();
                    BlobKey put = BlobServerPutTest.put((BlobService) createTestInstance, jobID, createRandomData(), BlobKey.BlobType.TRANSIENT_BLOB);
                    BlobKey put2 = BlobServerPutTest.put((BlobService) createTestInstance, jobID2, createRandomData(), BlobKey.BlobType.TRANSIENT_BLOB);
                    BlobKey put3 = BlobServerPutTest.put((BlobService) createTestInstance, jobID, createRandomData(), BlobKey.BlobType.PERMANENT_BLOB);
                    BlobKey put4 = BlobServerPutTest.put((BlobService) createTestInstance, jobID2, createRandomData(), BlobKey.BlobType.PERMANENT_BLOB);
                    TestingBlobHelpers.checkFilesExist(jobID, Arrays.asList(put, put3), createTestInstance, true);
                    TestingBlobHelpers.checkFilesExist(jobID2, Arrays.asList(put2, put4), createTestInstance, true);
                    triConsumerWithException.accept(createTestInstance, jobID, newSingleThreadExecutor);
                    TestingBlobHelpers.checkFileCountForJob(i, jobID, createTestInstance);
                    TestingBlobHelpers.checkFilesExist(jobID2, Arrays.asList(put2, put4), createTestInstance, true);
                    if (createTestInstance != null) {
                        if (0 != 0) {
                            try {
                                createTestInstance.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createTestInstance.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            Assertions.assertThat(newSingleThreadExecutor.shutdownNow()).isEmpty();
        }
    }

    @Test
    void testBlobServerExpiresRecoveredTransientJobBlob() throws Exception {
        runBlobServerExpiresRecoveredTransientBlob(new JobID());
    }

    @Test
    void testBlobServerExpiresRecoveredTransientNoJobBlob() throws Exception {
        runBlobServerExpiresRecoveredTransientBlob(null);
    }

    private void runBlobServerExpiresRecoveredTransientBlob(@Nullable JobID jobID) throws Exception {
        File storageLocation = BlobUtils.getStorageLocation(this.temporaryFolder, jobID, TestingBlobUtils.writeTransientBlob(this.temporaryFolder.toPath(), jobID, new byte[]{1, 2, 3, 4}));
        BlobServer createTestInstance = createTestInstance(this.temporaryFolder.getAbsolutePath(), 1L);
        Throwable th = null;
        try {
            try {
                CommonTestUtils.waitUntilCondition(() -> {
                    return Boolean.valueOf(!storageLocation.exists());
                });
                if (createTestInstance != null) {
                    if (0 == 0) {
                        createTestInstance.close();
                        return;
                    }
                    try {
                        createTestInstance.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestInstance != null) {
                if (th != null) {
                    try {
                        createTestInstance.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestInstance.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testBlobServerRetainsJobs() throws Exception {
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        byte[] bArr = {1, 2, 3, 4};
        PermanentBlobKey writePermanentBlob = TestingBlobUtils.writePermanentBlob(this.temporaryFolder.toPath(), jobID, bArr);
        PermanentBlobKey writePermanentBlob2 = TestingBlobUtils.writePermanentBlob(this.temporaryFolder.toPath(), jobID2, bArr);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            BlobServer createTestInstance = createTestInstance(this.temporaryFolder.getAbsolutePath(), ((Long) BlobServerOptions.CLEANUP_INTERVAL.defaultValue()).longValue());
            Throwable th = null;
            try {
                try {
                    createTestInstance.retainJobs(Collections.singleton(jobID), newSingleThreadExecutor);
                    Assertions.assertThat(createTestInstance.getFile(jobID, writePermanentBlob)).hasBinaryContent(bArr);
                    Assertions.assertThatThrownBy(() -> {
                        createTestInstance.getFile(jobID2, writePermanentBlob2);
                    }).isInstanceOf(NoSuchFileException.class);
                    if (createTestInstance != null) {
                        if (0 != 0) {
                            try {
                                createTestInstance.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createTestInstance.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            Assertions.assertThat(newSingleThreadExecutor.shutdownNow()).isEmpty();
        }
    }
}
