package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/ResultPartitionManager.class */
public class ResultPartitionManager implements ResultPartitionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);
    private final Map<ResultPartitionID, ResultPartition> registeredPartitions;

    @GuardedBy("registeredPartitions")
    private final Map<ResultPartitionID, PartitionRequestListenerManager> listenerManagers;

    @Nullable
    private ScheduledFuture<?> partitionListenerTimeoutChecker;
    private final int partitionListenerTimeout;
    private boolean isShutdown;

    @VisibleForTesting
    public ResultPartitionManager() {
        this(0, null);
    }

    public ResultPartitionManager(int i, ScheduledExecutor scheduledExecutor) {
        this.registeredPartitions = CollectionUtil.newHashMapWithExpectedSize(16);
        this.listenerManagers = new HashMap();
        this.partitionListenerTimeout = i;
        if (i <= 0 || scheduledExecutor == null) {
            return;
        }
        this.partitionListenerTimeoutChecker = scheduledExecutor.scheduleWithFixedDelay(this::checkRequestPartitionListeners, i, i, TimeUnit.MILLISECONDS);
    }

    public void registerResultPartition(ResultPartition resultPartition) throws IOException {
        PartitionRequestListenerManager remove;
        synchronized (this.registeredPartitions) {
            Preconditions.checkState(!this.isShutdown, "Result partition manager already shut down.");
            if (this.registeredPartitions.put(resultPartition.getPartitionId(), resultPartition) != null) {
                throw new IllegalStateException("Result partition already registered.");
            }
            remove = this.listenerManagers.remove(resultPartition.getPartitionId());
        }
        if (remove != null) {
            Iterator<PartitionRequestListener> it = remove.getPartitionRequestListeners().iterator();
            while (it.hasNext()) {
                it.next().notifyPartitionCreated(resultPartition);
            }
        }
        LOG.debug("Registered {}.", resultPartition);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionProvider
    public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionID, ResultSubpartitionIndexSet resultSubpartitionIndexSet, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        ResultSubpartitionView createSubpartitionView;
        synchronized (this.registeredPartitions) {
            ResultPartition resultPartition = this.registeredPartitions.get(resultPartitionID);
            if (resultPartition == null) {
                throw new PartitionNotFoundException(resultPartitionID);
            }
            LOG.debug("Requesting subpartitions {} of {}.", resultSubpartitionIndexSet, resultPartition);
            createSubpartitionView = resultPartition.createSubpartitionView(resultSubpartitionIndexSet, bufferAvailabilityListener);
        }
        return createSubpartitionView;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionProvider
    public Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(ResultPartitionID resultPartitionID, ResultSubpartitionIndexSet resultSubpartitionIndexSet, BufferAvailabilityListener bufferAvailabilityListener, PartitionRequestListener partitionRequestListener) throws IOException {
        ResultSubpartitionView createSubpartitionView;
        synchronized (this.registeredPartitions) {
            ResultPartition resultPartition = this.registeredPartitions.get(resultPartitionID);
            if (resultPartition == null) {
                this.listenerManagers.computeIfAbsent(resultPartitionID, resultPartitionID2 -> {
                    return new PartitionRequestListenerManager();
                }).registerListener(partitionRequestListener);
                createSubpartitionView = null;
            } else {
                LOG.debug("Requesting subpartitions {} of {}.", resultSubpartitionIndexSet, resultPartition);
                createSubpartitionView = resultPartition.createSubpartitionView(resultSubpartitionIndexSet, bufferAvailabilityListener);
            }
        }
        return createSubpartitionView == null ? Optional.empty() : Optional.of(createSubpartitionView);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartitionProvider
    public void releasePartitionRequestListener(PartitionRequestListener partitionRequestListener) {
        synchronized (this.registeredPartitions) {
            PartitionRequestListenerManager partitionRequestListenerManager = this.listenerManagers.get(partitionRequestListener.getResultPartitionId());
            if (partitionRequestListenerManager != null) {
                partitionRequestListenerManager.remove(partitionRequestListener.getReceiverId());
                if (partitionRequestListenerManager.isEmpty()) {
                    this.listenerManagers.remove(partitionRequestListener.getResultPartitionId());
                }
            }
        }
    }

    public void releasePartition(ResultPartitionID resultPartitionID, Throwable th) {
        PartitionRequestListenerManager remove;
        synchronized (this.registeredPartitions) {
            ResultPartition remove2 = this.registeredPartitions.remove(resultPartitionID);
            if (remove2 != null) {
                remove2.release(th);
                LOG.debug("Released partition {} produced by {}.", resultPartitionID.getPartitionId(), resultPartitionID.getProducerId());
            }
            remove = this.listenerManagers.remove(resultPartitionID);
        }
        if (remove == null || remove.isEmpty()) {
            return;
        }
        Iterator<PartitionRequestListener> it = remove.getPartitionRequestListeners().iterator();
        while (it.hasNext()) {
            it.next().notifyPartitionCreatedTimeout();
        }
    }

    public void shutdown() {
        synchronized (this.registeredPartitions) {
            LOG.debug("Releasing {} partitions because of shutdown.", Integer.valueOf(this.registeredPartitions.values().size()));
            Iterator<ResultPartition> it = this.registeredPartitions.values().iterator();
            while (it.hasNext()) {
                it.next().release();
            }
            this.registeredPartitions.clear();
            releaseListenerManagers();
            if (this.partitionListenerTimeoutChecker != null) {
                this.partitionListenerTimeoutChecker.cancel(false);
                this.partitionListenerTimeoutChecker = null;
            }
            this.isShutdown = true;
            LOG.debug("Successful shutdown.");
        }
    }

    private void releaseListenerManagers() {
        Iterator<PartitionRequestListenerManager> it = this.listenerManagers.values().iterator();
        while (it.hasNext()) {
            Iterator<PartitionRequestListener> it2 = it.next().getPartitionRequestListeners().iterator();
            while (it2.hasNext()) {
                it2.next().notifyPartitionCreatedTimeout();
            }
        }
        this.listenerManagers.clear();
    }

    private void checkRequestPartitionListeners() {
        LinkedList linkedList = new LinkedList();
        synchronized (this.registeredPartitions) {
            if (this.isShutdown) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<Map.Entry<ResultPartitionID, PartitionRequestListenerManager>> it = this.listenerManagers.entrySet().iterator();
            while (it.hasNext()) {
                PartitionRequestListenerManager value = it.next().getValue();
                value.removeExpiration(currentTimeMillis, this.partitionListenerTimeout, linkedList);
                if (value.isEmpty()) {
                    it.remove();
                }
            }
            Iterator it2 = linkedList.iterator();
            while (it2.hasNext()) {
                ((PartitionRequestListener) it2.next()).notifyPartitionCreatedTimeout();
            }
        }
    }

    @VisibleForTesting
    public Map<ResultPartitionID, PartitionRequestListenerManager> getListenerManagers() {
        return this.listenerManagers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onConsumedPartition(ResultPartition resultPartition) {
        LOG.debug("Received consume notification from {}.", resultPartition);
        synchronized (this.registeredPartitions) {
            if (resultPartition == this.registeredPartitions.remove(resultPartition.getPartitionId())) {
                resultPartition.release();
                ResultPartitionID partitionId = resultPartition.getPartitionId();
                LOG.debug("Released partition {} produced by {}.", partitionId.getPartitionId(), partitionId.getProducerId());
            }
            PartitionRequestListenerManager remove = this.listenerManagers.remove(resultPartition.getPartitionId());
            Preconditions.checkState(remove == null || remove.isEmpty(), "The partition request listeners is not empty for " + resultPartition.getPartitionId());
        }
    }

    public Collection<ResultPartitionID> getUnreleasedPartitions() {
        Set<ResultPartitionID> keySet;
        synchronized (this.registeredPartitions) {
            keySet = this.registeredPartitions.keySet();
        }
        return keySet;
    }

    public Optional<ShuffleMetrics> getMetricsOfPartition(ResultPartitionID resultPartitionID) {
        synchronized (this.registeredPartitions) {
            ResultPartition resultPartition = this.registeredPartitions.get(resultPartitionID);
            if (resultPartition == null) {
                return Optional.empty();
            }
            return Optional.of(new DefaultShuffleMetrics(resultPartition.getResultPartitionBytes().createSnapshot()));
        }
    }
}
