package org.apache.flink.runtime.shuffle;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/shuffle/NettyShuffleMaster.class */
public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> {
    private final int buffersPerInputChannel;
    private final int floatingBuffersPerGate;
    private final Optional<Integer> maxRequiredBuffersPerGate;
    private final int sortShuffleMinParallelism;
    private final int sortShuffleMinBuffers;
    private final int networkBufferSize;

    @Nullable
    private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
    private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap();

    public NettyShuffleMaster(Configuration configuration) {
        Preconditions.checkNotNull(configuration);
        this.buffersPerInputChannel = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL)).intValue();
        this.floatingBuffersPerGate = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE)).intValue();
        this.maxRequiredBuffersPerGate = configuration.getOptional(NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
        this.sortShuffleMinParallelism = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM)).intValue();
        this.sortShuffleMinBuffers = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS)).intValue();
        this.networkBufferSize = ConfigurationParserUtils.getPageSize(configuration);
        if (isHybridShuffleNewModeEnabled(configuration)) {
            this.tieredInternalShuffleMaster = new TieredInternalShuffleMaster(configuration);
        } else {
            this.tieredInternalShuffleMaster = null;
        }
        Preconditions.checkArgument(!this.maxRequiredBuffersPerGate.isPresent() || this.maxRequiredBuffersPerGate.get().intValue() >= 1, String.format("At least one buffer is required for each gate, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));
        Preconditions.checkArgument(this.floatingBuffersPerGate >= 1, String.format("The configured floating buffer should be at least 1, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.key()));
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
        ResultPartitionID resultPartitionID = new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
        NettyShuffleDescriptor nettyShuffleDescriptor = new NettyShuffleDescriptor(producerDescriptor.getProducerLocation(), createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID);
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.addPartition(resultPartitionID);
        }
        return CompletableFuture.completedFuture(nettyShuffleDescriptor);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.releasePartition(shuffleDescriptor.getResultPartitionID());
        }
    }

    private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(ProducerDescriptor producerDescriptor, int i) {
        return producerDescriptor.getDataPort() >= 0 ? NettyShuffleDescriptor.NetworkPartitionConnectionInfo.fromProducerDescriptor(producerDescriptor, i) : NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo.INSTANCE;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
        Preconditions.checkNotNull(taskInputsOutputsDescriptor);
        return new MemorySize(this.networkBufferSize * NettyShuffleUtils.computeNetworkBuffersForAnnouncing(this.buffersPerInputChannel, this.floatingBuffersPerGate, this.maxRequiredBuffersPerGate, this.sortShuffleMinParallelism, this.sortShuffleMinBuffers, taskInputsOutputsDescriptor.getInputChannelNums(), taskInputsOutputsDescriptor.getPartitionReuseCount(), taskInputsOutputsDescriptor.getSubpartitionNums(), taskInputsOutputsDescriptor.getInputPartitionTypes(), taskInputsOutputsDescriptor.getPartitionTypes()));
    }

    private boolean isHybridShuffleNewModeEnabled(Configuration configuration) {
        return (configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL || configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) && ((Boolean) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE)).booleanValue();
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetrics(JobID jobID) {
        return ((JobShuffleContext) Preconditions.checkNotNull(this.jobShuffleContexts.get(jobID))).getAllPartitionWithMetricsOnTaskManagers();
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void registerJob(JobShuffleContext jobShuffleContext) {
        this.jobShuffleContexts.put(jobShuffleContext.getJobId(), jobShuffleContext);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void unregisterJob(JobID jobID) {
        this.jobShuffleContexts.remove(jobID);
    }
}
