/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultResourceCleaner<T>
implements ResourceCleaner {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceCleaner.class);
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final Executor cleanupExecutor;
    private final CleanupFn<T> cleanupFn;
    private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
    private final Collection<CleanupWithLabel<T>> regularCleanup;
    private final RetryStrategy retryStrategy;

    public static Builder<LocallyCleanableResource> forLocallyCleanableResources(ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, RetryStrategy retryStrategy) {
        return DefaultResourceCleaner.forCleanableResources(mainThreadExecutor, cleanupExecutor, LocallyCleanableResource::localCleanupAsync, retryStrategy);
    }

    public static Builder<GloballyCleanableResource> forGloballyCleanableResources(ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, RetryStrategy retryStrategy) {
        return DefaultResourceCleaner.forCleanableResources(mainThreadExecutor, cleanupExecutor, GloballyCleanableResource::globalCleanupAsync, retryStrategy);
    }

    @VisibleForTesting
    static <T> Builder<T> forCleanableResources(ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFunction, RetryStrategy retryStrategy) {
        return new Builder<T>(mainThreadExecutor, cleanupExecutor, cleanupFunction, retryStrategy);
    }

    private DefaultResourceCleaner(ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFn, Collection<CleanupWithLabel<T>> prioritizedCleanup, Collection<CleanupWithLabel<T>> regularCleanup, RetryStrategy retryStrategy) {
        this.mainThreadExecutor = mainThreadExecutor;
        this.cleanupExecutor = cleanupExecutor;
        this.cleanupFn = cleanupFn;
        this.prioritizedCleanup = prioritizedCleanup;
        this.regularCleanup = regularCleanup;
        this.retryStrategy = retryStrategy;
    }

    @Override
    public CompletableFuture<Void> cleanupAsync(JobID jobId) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CompletionStage<Void> cleanupFuture = FutureUtils.completedVoidFuture();
        for (CleanupWithLabel<T> cleanupWithLabel : this.prioritizedCleanup) {
            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> this.withRetry(jobId, cleanupWithLabel.getLabel(), cleanupWithLabel.getCleanup()));
        }
        return cleanupFuture.thenCompose(ignoredValue -> FutureUtils.completeAll(this.regularCleanup.stream().map(cleanupWithLabel -> this.withRetry(jobId, cleanupWithLabel.getLabel(), cleanupWithLabel.getCleanup())).collect(Collectors.toList())));
    }

    private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) {
        return FutureUtils.retryWithDelay(() -> this.cleanupFn.cleanupAsync(cleanup, jobId, this.cleanupExecutor).whenComplete((value, throwable) -> {
            if (throwable != null) {
                String logMessage = String.format("Cleanup of %s failed for job %s due to a %s: %s", label, jobId, throwable.getClass().getSimpleName(), throwable.getMessage());
                if (LOG.isTraceEnabled()) {
                    LOG.warn(logMessage, throwable);
                } else {
                    LOG.warn(logMessage);
                }
            }
        }), this.retryStrategy, (ScheduledExecutor)this.mainThreadExecutor);
    }

    private static class CleanupWithLabel<CLEANUP_TYPE> {
        private final CLEANUP_TYPE cleanup;
        private final String label;

        public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) {
            this.cleanup = cleanup;
            this.label = label;
        }

        public CLEANUP_TYPE getCleanup() {
            return this.cleanup;
        }

        public String getLabel() {
            return this.label;
        }
    }

    public static class Builder<T> {
        private final ComponentMainThreadExecutor mainThreadExecutor;
        private final Executor cleanupExecutor;
        private final CleanupFn<T> cleanupFn;
        private final RetryStrategy retryStrategy;
        private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<CleanupWithLabel<T>>();
        private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<CleanupWithLabel<T>>();

        private Builder(ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFn, RetryStrategy retryStrategy) {
            this.mainThreadExecutor = mainThreadExecutor;
            this.cleanupExecutor = cleanupExecutor;
            this.cleanupFn = cleanupFn;
            this.retryStrategy = retryStrategy;
        }

        public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) {
            this.prioritizedCleanup.add(new CleanupWithLabel<T>(prioritizedCleanup, label));
            return this;
        }

        public Builder<T> withRegularCleanup(String label, T regularCleanup) {
            this.regularCleanup.add(new CleanupWithLabel<T>(regularCleanup, label));
            return this;
        }

        public DefaultResourceCleaner<T> build() {
            return new DefaultResourceCleaner<T>(this.mainThreadExecutor, this.cleanupExecutor, this.cleanupFn, this.prioritizedCleanup, this.regularCleanup, this.retryStrategy);
        }
    }

    @FunctionalInterface
    @VisibleForTesting
    static interface CleanupFn<T> {
        public CompletableFuture<Void> cleanupAsync(T var1, JobID var2, Executor var3);
    }
}

