/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.shuffle.EmptyShuffleMasterSnapshot;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshotContext;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;

public class NettyShuffleMaster
implements ShuffleMaster<NettyShuffleDescriptor> {
    private final int buffersPerInputChannel;
    private final int floatingBuffersPerGate;
    private final Optional<Integer> maxRequiredBuffersPerGate;
    private final int sortShuffleMinParallelism;
    private final int sortShuffleMinBuffers;
    private final int networkBufferSize;
    @Nullable
    private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
    private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap<JobID, JobShuffleContext>();

    public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
        Configuration conf = shuffleMasterContext.getConfiguration();
        Preconditions.checkNotNull(conf);
        this.buffersPerInputChannel = conf.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
        this.floatingBuffersPerGate = conf.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
        this.maxRequiredBuffersPerGate = conf.getOptional(NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
        this.sortShuffleMinParallelism = conf.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
        this.sortShuffleMinBuffers = conf.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
        this.networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
        this.tieredInternalShuffleMaster = this.isHybridShuffleNewModeEnabled(conf) ? new TieredInternalShuffleMaster(shuffleMasterContext) : null;
        Preconditions.checkArgument(!this.maxRequiredBuffersPerGate.isPresent() || this.maxRequiredBuffersPerGate.get() >= 1, String.format("At least one buffer is required for each gate, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));
        Preconditions.checkArgument(this.floatingBuffersPerGate >= 1, String.format("The configured floating buffer should be at least 1, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.key()));
    }

    @Override
    public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
        ResultPartitionID resultPartitionID = new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
        List<TierShuffleDescriptor> tierShuffleDescriptors = null;
        if (this.tieredInternalShuffleMaster != null) {
            tierShuffleDescriptors = this.tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(jobID, resultPartitionID);
        }
        NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(producerDescriptor.getProducerLocation(), NettyShuffleMaster.createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID, tierShuffleDescriptors);
        return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
    }

    @Override
    public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.releasePartition(shuffleDescriptor);
        }
    }

    private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(ProducerDescriptor producerDescriptor, int connectionIndex) {
        return producerDescriptor.getDataPort() >= 0 ? NettyShuffleDescriptor.NetworkPartitionConnectionInfo.fromProducerDescriptor(producerDescriptor, connectionIndex) : NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo.INSTANCE;
    }

    @Override
    public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
        Preconditions.checkNotNull(desc);
        int numRequiredNetworkBuffers = NettyShuffleUtils.computeNetworkBuffersForAnnouncing(this.buffersPerInputChannel, this.floatingBuffersPerGate, this.maxRequiredBuffersPerGate, this.sortShuffleMinParallelism, this.sortShuffleMinBuffers, desc.getInputChannelNums(), desc.getPartitionReuseCount(), desc.getSubpartitionNums(), desc.getInputPartitionTypes(), desc.getPartitionTypes());
        return new MemorySize((long)this.networkBufferSize * (long)numRequiredNetworkBuffers);
    }

    private boolean isHybridShuffleNewModeEnabled(Configuration conf) {
        return (conf.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL || conf.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) && conf.get(NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE) != false;
    }

    @Override
    public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(JobID jobId, Duration timeout, Set<ResultPartitionID> expectedPartitions) {
        return Preconditions.checkNotNull(this.jobShuffleContexts.get(jobId)).getPartitionWithMetrics(timeout, expectedPartitions);
    }

    @Override
    public void registerJob(JobShuffleContext context) {
        this.jobShuffleContexts.put(context.getJobId(), context);
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.registerJob(context);
        }
    }

    @Override
    public void unregisterJob(JobID jobId) {
        this.jobShuffleContexts.remove(jobId);
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.unregisterJob(jobId);
        }
    }

    @Override
    public boolean supportsBatchSnapshot() {
        return true;
    }

    @Override
    public void snapshotState(CompletableFuture<ShuffleMasterSnapshot> snapshotFuture, ShuffleMasterSnapshotContext context) {
        snapshotFuture.complete(EmptyShuffleMasterSnapshot.getInstance());
    }

    @Override
    public void notifyPartitionRecoveryStarted(JobID jobId) {
        Preconditions.checkNotNull(this.jobShuffleContexts.get(jobId)).notifyPartitionRecoveryStarted();
    }

    @Override
    public void close() throws Exception {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.close();
        }
    }
}

