/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorFileMergingManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorFileMergingManager.class);
    @GuardedBy(value="lock")
    private final Map<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>> fileMergingSnapshotManagerByJobId;
    @GuardedBy(value="lock")
    private boolean closed = false;
    private final Object lock = new Object();
    private final Thread shutdownHook;

    public TaskExecutorFileMergingManager() {
        this.fileMergingSnapshotManagerByJobId = new HashMap<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>>();
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, this.getClass().getSimpleName(), LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public FileMergingSnapshotManager fileMergingSnapshotManagerForTask(@Nonnull JobID jobId, @Nonnull ResourceID tmResourceId, @Nonnull ExecutionAttemptID executionAttemptID, Configuration clusterConfiguration, Configuration jobConfiguration, TaskManagerJobMetricGroup metricGroup) {
        boolean mergingEnabled = jobConfiguration.getOptional(CheckpointingOptions.FILE_MERGING_ENABLED).orElse(clusterConfiguration.get(CheckpointingOptions.FILE_MERGING_ENABLED));
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorFileMergingManager is already closed and cannot register a new FileMergingSnapshotManager.");
            }
            if (!mergingEnabled) {
                return null;
            }
            Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> fileMergingSnapshotManagerAndRetainedExecutions = this.fileMergingSnapshotManagerByJobId.get(jobId);
            if (fileMergingSnapshotManagerAndRetainedExecutions == null) {
                FileMergingType fileMergingType = jobConfiguration.getOptional(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY).orElse(clusterConfiguration.get(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY)) != false ? FileMergingType.MERGE_ACROSS_CHECKPOINT : FileMergingType.MERGE_WITHIN_CHECKPOINT;
                MemorySize maxFileSize = jobConfiguration.getOptional(CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE).orElse(clusterConfiguration.get(CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE));
                Boolean usingBlockingPool = jobConfiguration.getOptional(CheckpointingOptions.FILE_MERGING_POOL_BLOCKING).orElse(clusterConfiguration.get(CheckpointingOptions.FILE_MERGING_POOL_BLOCKING));
                Float spaceAmplification = jobConfiguration.getOptional(CheckpointingOptions.FILE_MERGING_MAX_SPACE_AMPLIFICATION).orElse(clusterConfiguration.get(CheckpointingOptions.FILE_MERGING_MAX_SPACE_AMPLIFICATION));
                fileMergingSnapshotManagerAndRetainedExecutions = Tuple2.of(new FileMergingSnapshotManagerBuilder(jobId, tmResourceId, fileMergingType).setMaxFileSize(maxFileSize.getBytes()).setFilePoolType(usingBlockingPool != false ? PhysicalFilePool.Type.BLOCKING : PhysicalFilePool.Type.NON_BLOCKING).setMaxSpaceAmplification(spaceAmplification.floatValue()).setMetricGroup(metricGroup).build(), new HashSet());
                this.fileMergingSnapshotManagerByJobId.put(jobId, fileMergingSnapshotManagerAndRetainedExecutions);
                LOG.info("Registered new file merging snapshot manager for job {}.", (Object)jobId);
            }
            ((Set)fileMergingSnapshotManagerAndRetainedExecutions.f1).add(executionAttemptID);
            return (FileMergingSnapshotManager)fileMergingSnapshotManagerAndRetainedExecutions.f0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseMergingSnapshotManagerForTask(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID) {
        Object object = this.lock;
        synchronized (object) {
            Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> fileMergingSnapshotManagerAndRetainedExecutions = this.fileMergingSnapshotManagerByJobId.get(jobId);
            if (fileMergingSnapshotManagerAndRetainedExecutions != null) {
                LOG.debug("Releasing file merging snapshot manager under job id {} and attempt {}.", (Object)jobId, (Object)executionAttemptID);
                ((Set)fileMergingSnapshotManagerAndRetainedExecutions.f1).remove(executionAttemptID);
                if (((Set)fileMergingSnapshotManagerAndRetainedExecutions.f1).isEmpty()) {
                    this.releaseMergingSnapshotManagerForJob(jobId);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
        LOG.debug("Releasing file merging snapshot manager under job id {}.", (Object)jobId);
        Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> toRelease = null;
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            toRelease = this.fileMergingSnapshotManagerByJobId.remove(jobId);
        }
        if (toRelease != null) {
            if (!((Set)toRelease.f1).isEmpty()) {
                LOG.warn("The file merging snapshot manager for job {} is released before all tasks are released.", (Object)jobId);
            }
            try {
                ((FileMergingSnapshotManager)toRelease.f0).close();
            }
            catch (Exception e) {
                LOG.warn("Exception while closing TaskExecutorFileMergingManager for job {}.", (Object)jobId, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        HashMap<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>> toRelease = null;
        Iterator iterator = this.lock;
        synchronized (iterator) {
            toRelease = new HashMap<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>>(this.fileMergingSnapshotManagerByJobId);
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.fileMergingSnapshotManagerByJobId.clear();
        }
        LOG.info("Shutting down TaskExecutorFileMergingManager.");
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), LOG);
        for (Map.Entry entry : toRelease.entrySet()) {
            if (entry.getValue() == null) continue;
            try {
                ((FileMergingSnapshotManager)((Tuple2)entry.getValue()).f0).close();
            }
            catch (Exception e) {
                LOG.warn("Exception while closing TaskExecutorFileMergingManager for job {}.", entry.getKey(), (Object)e);
            }
        }
    }
}

