/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ShuffleVertexManager
extends VertexManagerPlugin {
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.shuffle-vertex-manager.min-src-fraction";
    public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.shuffle-vertex-manager.max-src-fraction";
    public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = "tez.shuffle-vertex-manager.enable.auto-parallel";
    public static final boolean TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = "tez.shuffle-vertex-manager.desired-task-input-size";
    public static final long TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 0x6400000L;
    public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = "tez.shuffle-vertex-manager.min-task-parallelism";
    public static final int TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
    private static final Log LOG = LogFactory.getLog(ShuffleVertexManager.class);
    float slowStartMinSrcCompletionFraction;
    float slowStartMaxSrcCompletionFraction;
    long desiredTaskInputDataSize = 0x6400000L;
    int minTaskParallelism = 1;
    boolean enableAutoParallelism = false;
    boolean parallelismDetermined = false;
    int totalNumSourceTasks = 0;
    int numSourceTasksCompleted = 0;
    int numVertexManagerEventsReceived = 0;
    List<Integer> pendingTasks;
    int totalTasksToSchedule = 0;
    Map<String, Set<Integer>> bipartiteSources = Maps.newHashMap();
    long completedSourceTasksOutputSize = 0L;

    public ShuffleVertexManager(VertexManagerPluginContext context) {
        super(context);
    }

    public void onVertexStarted(Map<String, List<Integer>> completions) {
        this.pendingTasks = Lists.newArrayListWithCapacity((int)this.getContext().getVertexNumTasks(this.getContext().getVertexName()));
        this.updatePendingTasks();
        this.updateSourceTaskCount();
        LOG.info((Object)("OnVertexStarted vertex: " + this.getContext().getVertexName() + " with " + this.totalNumSourceTasks + " source tasks and " + this.totalTasksToSchedule + " pending tasks"));
        if (completions != null) {
            for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
                for (Integer taskId : entry.getValue()) {
                    this.onSourceTaskCompleted(entry.getKey(), taskId);
                }
            }
        }
        this.schedulePendingTasks();
    }

    public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
        this.updateSourceTaskCount();
        Set<Integer> completedSourceTasks = this.bipartiteSources.get(srcVertexName);
        if (completedSourceTasks != null) {
            if (completedSourceTasks.add(srcTaskId)) {
                ++this.numSourceTasksCompleted;
            }
            this.schedulePendingTasks();
        }
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        if (this.enableAutoParallelism) {
            ShuffleUserPayloads.VertexManagerEventPayloadProto proto;
            try {
                proto = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)vmEvent.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException((Throwable)e);
            }
            long sourceTaskOutputSize = proto.getOutputSize();
            ++this.numVertexManagerEventsReceived;
            this.completedSourceTasksOutputSize += sourceTaskOutputSize;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Received info of output size: " + sourceTaskOutputSize + " numInfoReceived: " + this.numVertexManagerEventsReceived + " total output size: " + this.completedSourceTasksOutputSize));
            }
        }
    }

    void updatePendingTasks() {
        this.pendingTasks.clear();
        for (int i = 0; i < this.getContext().getVertexNumTasks(this.getContext().getVertexName()); ++i) {
            this.pendingTasks.add(new Integer(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    void updateSourceTaskCount() {
        int numSrcTasks = 0;
        for (String vertex : this.bipartiteSources.keySet()) {
            numSrcTasks += this.getContext().getVertexNumTasks(vertex);
        }
        this.totalNumSourceTasks = numSrcTasks;
    }

    @VisibleForTesting
    boolean determineParallelismAndApply() {
        boolean canDetermineParallelismLater;
        if (this.numSourceTasksCompleted == 0) {
            return true;
        }
        if (this.numVertexManagerEventsReceived == 0) {
            return true;
        }
        int currentParallelism = this.pendingTasks.size();
        boolean bl = canDetermineParallelismLater = this.completedSourceTasksOutputSize < this.desiredTaskInputDataSize && (float)this.numSourceTasksCompleted < (float)this.totalNumSourceTasks * this.slowStartMaxSrcCompletionFraction;
        if (canDetermineParallelismLater) {
            LOG.info((Object)("Defer scheduling tasks; vertex=" + this.getContext().getVertexName() + ", totalNumSourceTasks=" + this.totalNumSourceTasks + ", completedSourceTasksOutputSize=" + this.completedSourceTasksOutputSize + ", numVertexManagerEventsReceived=" + this.numVertexManagerEventsReceived + ", numSourceTasksCompleted=" + this.numSourceTasksCompleted + ", maxThreshold=" + (float)this.totalNumSourceTasks * this.slowStartMaxSrcCompletionFraction));
            return false;
        }
        long expectedTotalSourceTasksOutputSize = (long)this.totalNumSourceTasks * this.completedSourceTasksOutputSize / (long)this.numVertexManagerEventsReceived;
        int desiredTaskParallelism = (int)((expectedTotalSourceTasksOutputSize + this.desiredTaskInputDataSize - 1L) / this.desiredTaskInputDataSize);
        if (desiredTaskParallelism < this.minTaskParallelism) {
            desiredTaskParallelism = this.minTaskParallelism;
        }
        if (desiredTaskParallelism >= currentParallelism) {
            return true;
        }
        int basePartitionRange = currentParallelism / desiredTaskParallelism;
        if (basePartitionRange <= 1) {
            return true;
        }
        int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
        int remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
        int finalTaskParallelism = remainderRangeForLastShuffler > 0 ? numShufflersWithBaseRange + 1 : numShufflersWithBaseRange;
        LOG.info((Object)("Reduce auto parallelism for vertex: " + this.getContext().getVertexName() + " to " + finalTaskParallelism + " from " + this.pendingTasks.size() + " . Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: " + this.completedSourceTasksOutputSize + " from " + this.numVertexManagerEventsReceived + " vertex manager events. " + " desiredTaskInputSize: " + this.desiredTaskInputDataSize + " max slow start tasks:" + (float)this.totalNumSourceTasks * this.slowStartMaxSrcCompletionFraction + " num sources completed:" + this.numSourceTasksCompleted));
        if (finalTaskParallelism < currentParallelism) {
            HashMap<String, EdgeManagerPluginDescriptor> edgeManagers = new HashMap<String, EdgeManagerPluginDescriptor>(this.bipartiteSources.size());
            for (String vertex : this.bipartiteSources.keySet()) {
                CustomShuffleEdgeManagerConfig edgeManagerConfig = new CustomShuffleEdgeManagerConfig(currentParallelism, finalTaskParallelism, basePartitionRange, remainderRangeForLastShuffler > 0 ? remainderRangeForLastShuffler : basePartitionRange);
                EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)CustomShuffleEdgeManager.class.getName());
                edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
                edgeManagers.put(vertex, edgeManagerDescriptor);
            }
            this.getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
            this.updatePendingTasks();
        }
        return true;
    }

    void schedulePendingTasks(int numTasksToSchedule) {
        if (this.enableAutoParallelism && !this.parallelismDetermined) {
            this.parallelismDetermined = this.determineParallelismAndApply();
            if (!this.parallelismDetermined) {
                return;
            }
        }
        ArrayList scheduledTasks = Lists.newArrayListWithCapacity((int)numTasksToSchedule);
        while (!this.pendingTasks.isEmpty() && numTasksToSchedule > 0) {
            --numTasksToSchedule;
            scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(this.pendingTasks.get(0), null));
            this.pendingTasks.remove(0);
        }
        this.getContext().scheduleVertexTasks((List)scheduledTasks);
    }

    void schedulePendingTasks() {
        int numPendingTasks = this.pendingTasks.size();
        if (numPendingTasks == 0) {
            return;
        }
        if (this.numSourceTasksCompleted == this.totalNumSourceTasks && numPendingTasks > 0) {
            LOG.info((Object)("All source tasks assigned. Ramping up " + numPendingTasks + " remaining tasks for vertex: " + this.getContext().getVertexName()));
            this.schedulePendingTasks(numPendingTasks);
            return;
        }
        float completedSourceTaskFraction = 0.0f;
        completedSourceTaskFraction = this.totalNumSourceTasks != 0 ? (float)this.numSourceTasksCompleted / (float)this.totalNumSourceTasks : 1.0f;
        float tasksFractionToSchedule = 1.0f;
        float percentRange = this.slowStartMaxSrcCompletionFraction - this.slowStartMinSrcCompletionFraction;
        if (percentRange > 0.0f) {
            tasksFractionToSchedule = (completedSourceTaskFraction - this.slowStartMinSrcCompletionFraction) / percentRange;
        } else if (completedSourceTaskFraction < this.slowStartMinSrcCompletionFraction) {
            tasksFractionToSchedule = 0.0f;
        }
        if (tasksFractionToSchedule > 1.0f) {
            tasksFractionToSchedule = 1.0f;
        } else if (tasksFractionToSchedule < 0.0f) {
            tasksFractionToSchedule = 0.0f;
        }
        int numTasksToSchedule = (int)(tasksFractionToSchedule * (float)this.totalTasksToSchedule) - (this.totalTasksToSchedule - numPendingTasks);
        if (numTasksToSchedule > 0) {
            LOG.info((Object)("Scheduling " + numTasksToSchedule + " tasks for vertex: " + this.getContext().getVertexName() + " with totalTasks: " + this.totalTasksToSchedule + ". " + this.numSourceTasksCompleted + " source tasks completed out of " + this.totalNumSourceTasks + ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + " min: " + this.slowStartMinSrcCompletionFraction + " max: " + this.slowStartMaxSrcCompletionFraction));
            this.schedulePendingTasks(numTasksToSchedule);
        }
    }

    public void initialize() {
        Configuration conf;
        try {
            conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
        this.slowStartMinSrcCompletionFraction = conf.getFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
        this.slowStartMaxSrcCompletionFraction = conf.getFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0.75f);
        if (this.slowStartMinSrcCompletionFraction < 0.0f || this.slowStartMaxSrcCompletionFraction < this.slowStartMinSrcCompletionFraction) {
            throw new IllegalArgumentException("Invalid values for slowStartMinSrcCompletionFraction/slowStartMaxSrcCompletionFraction. Min cannot be < 0 and max cannot be < min.");
        }
        this.enableAutoParallelism = conf.getBoolean(TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, false);
        this.desiredTaskInputDataSize = conf.getLong(TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 0x6400000L);
        this.minTaskParallelism = Math.max(1, conf.getInt(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, 1));
        LOG.info((Object)("Shuffle Vertex Manager: settings minFrac:" + this.slowStartMinSrcCompletionFraction + " maxFrac:" + this.slowStartMaxSrcCompletionFraction + " auto:" + this.enableAutoParallelism + " desiredTaskIput:" + this.desiredTaskInputDataSize + " minTasks:" + this.minTaskParallelism));
        Map inputs = this.getContext().getInputVertexEdgeProperties();
        for (Map.Entry entry : inputs.entrySet()) {
            if (((EdgeProperty)entry.getValue()).getDataMovementType() != EdgeProperty.DataMovementType.SCATTER_GATHER) continue;
            String vertex = (String)entry.getKey();
            this.bipartiteSources.put(vertex, new HashSet());
        }
        if (this.bipartiteSources.isEmpty()) {
            throw new TezUncheckedException("Atleast 1 bipartite source should exist");
        }
    }

    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
    }

    public static ShuffleVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration conf) {
        return new ShuffleVertexManagerConfigBuilder(conf);
    }

    public static final class ShuffleVertexManagerConfigBuilder {
        private final Configuration conf;

        private ShuffleVertexManagerConfigBuilder(@Nullable Configuration conf) {
            this.conf = conf == null ? new Configuration(false) : conf;
        }

        public ShuffleVertexManagerConfigBuilder setAutoReduceParallelism(boolean enabled) {
            this.conf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, enabled);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setSlowStartMinSrcCompletionFraction(float minFraction) {
            this.conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, minFraction);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setSlowStartMaxSrcCompletionFraction(float maxFraction) {
            this.conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, maxFraction);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setDesiredTaskInputSize(long desiredTaskInputSize) {
            this.conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, desiredTaskInputSize);
            return this;
        }

        public ShuffleVertexManagerConfigBuilder setMinTaskParallelism(int minTaskParallelism) {
            this.conf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, minTaskParallelism);
            return this;
        }

        public VertexManagerPluginDescriptor build() {
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)ShuffleVertexManager.class.getName());
            try {
                return (VertexManagerPluginDescriptor)desc.setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)this.conf));
            }
            catch (IOException e) {
                throw new TezUncheckedException((Throwable)e);
            }
        }
    }

    private static class CustomShuffleEdgeManagerConfig {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;

        private CustomShuffleEdgeManagerConfig(int numSourceTaskOutputs, int numDestinationTasks, int basePartitionRange, int remainderRangeForLastShuffler) {
            this.numSourceTaskOutputs = numSourceTaskOutputs;
            this.numDestinationTasks = numDestinationTasks;
            this.basePartitionRange = basePartitionRange;
            this.remainderRangeForLastShuffler = remainderRangeForLastShuffler;
        }

        public UserPayload toUserPayload() {
            return UserPayload.create((ByteBuffer)ByteBuffer.wrap(ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.newBuilder().setNumSourceTaskOutputs(this.numSourceTaskOutputs).setNumDestinationTasks(this.numDestinationTasks).setBasePartitionRange(this.basePartitionRange).setRemainderRangeForLastShuffler(this.remainderRangeForLastShuffler).build().toByteArray()));
        }

        public static CustomShuffleEdgeManagerConfig fromUserPayload(UserPayload payload) throws InvalidProtocolBufferException {
            ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto proto = ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)payload.getPayload()));
            return new CustomShuffleEdgeManagerConfig(proto.getNumSourceTaskOutputs(), proto.getNumDestinationTasks(), proto.getBasePartitionRange(), proto.getRemainderRangeForLastShuffler());
        }
    }

    public static class CustomShuffleEdgeManager
    extends EdgeManagerPlugin {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;
        int numSourceTasks;

        public CustomShuffleEdgeManager(EdgeManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            CustomShuffleEdgeManagerConfig config;
            UserPayload userPayload = this.getContext().getUserPayload();
            if (userPayload == null || userPayload.getPayload() == null || userPayload.getPayload().limit() == 0) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload");
            }
            try {
                config = CustomShuffleEdgeManagerConfig.fromUserPayload(userPayload);
            }
            catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload", e);
            }
            this.numSourceTaskOutputs = config.numSourceTaskOutputs;
            this.numDestinationTasks = config.numDestinationTasks;
            this.basePartitionRange = config.basePartitionRange;
            this.remainderRangeForLastShuffler = config.remainderRangeForLastShuffler;
            this.numSourceTasks = this.getContext().getSourceVertexNumTasks();
            Preconditions.checkState((this.numDestinationTasks == this.getContext().getDestinationVertexNumTasks() ? 1 : 0) != 0);
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
            int partitionRange = 1;
            partitionRange = destinationTaskIndex < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler;
            return this.numSourceTasks * partitionRange;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
            return this.numSourceTaskOutputs;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            int sourceIndex = event.getSourceIndex();
            int destinationTaskIndex = sourceIndex / this.basePartitionRange;
            int partitionRange = 1;
            partitionRange = destinationTaskIndex < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler;
            int targetIndex = sourceTaskIndex * partitionRange + sourceIndex % partitionRange;
            destinationTaskAndInputIndices.put(new Integer(destinationTaskIndex), Collections.singletonList(new Integer(targetIndex)));
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
            if (this.remainderRangeForLastShuffler < this.basePartitionRange) {
                int i;
                int startOffset = sourceTaskIndex * this.basePartitionRange;
                ArrayList allIndices = Lists.newArrayListWithCapacity((int)this.basePartitionRange);
                for (int i2 = 0; i2 < this.basePartitionRange; ++i2) {
                    allIndices.add(startOffset + i2);
                }
                List inputIndices = Collections.unmodifiableList(allIndices);
                for (i = 0; i < this.numDestinationTasks - 1; ++i) {
                    destinationTaskAndInputIndices.put(i, inputIndices);
                }
                startOffset = sourceTaskIndex * this.remainderRangeForLastShuffler;
                allIndices = Lists.newArrayListWithCapacity((int)this.remainderRangeForLastShuffler);
                for (i = 0; i < this.remainderRangeForLastShuffler; ++i) {
                    allIndices.add(startOffset + i);
                }
                inputIndices = Collections.unmodifiableList(allIndices);
                destinationTaskAndInputIndices.put(this.numDestinationTasks - 1, inputIndices);
            } else {
                int startOffset = sourceTaskIndex * this.basePartitionRange;
                ArrayList allIndices = Lists.newArrayListWithCapacity((int)this.basePartitionRange);
                for (int i = 0; i < this.basePartitionRange; ++i) {
                    allIndices.add(startOffset + i);
                }
                List inputIndices = Collections.unmodifiableList(allIndices);
                for (int i = 0; i < this.numDestinationTasks; ++i) {
                    destinationTaskAndInputIndices.put(i, inputIndices);
                }
            }
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) {
            int partitionRange = 1;
            partitionRange = destinationTaskIndex < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler;
            return destinationFailedInputIndex / partitionRange;
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
            return this.numDestinationTasks;
        }
    }
}

