/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Inflater;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
abstract class ShuffleVertexManagerBase
extends VertexManagerPlugin {
    static long MB = 0x100000L;
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleVertexManagerBase.class);
    ComputeRoutingAction computeRoutingAction = ComputeRoutingAction.WAIT;
    int totalNumBipartiteSourceTasks = 0;
    int numBipartiteSourceTasksCompleted = 0;
    int numVertexManagerEventsReceived = 0;
    List<VertexManagerEvent> pendingVMEvents = Lists.newLinkedList();
    AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
    private Set<TaskIdentifier> taskWithVmEvents = Sets.newHashSet();
    private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newConcurrentMap();
    boolean sourceVerticesScheduled = false;
    @VisibleForTesting
    int bipartiteSources = 0;
    long completedSourceTasksOutputSize = 0L;
    List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
    List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
    int totalTasksToSchedule = 0;
    @VisibleForTesting
    Configuration conf;
    ShuffleVertexManagerBaseConfig config;
    final Inflater inflater = TezCommonUtils.newInflater();

    SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty, int numTasks) {
        return new SourceVertexInfo(edgeProperty, numTasks);
    }

    public ShuffleVertexManagerBase(VertexManagerPluginContext context) {
        super(context);
    }

    public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
        Map inputs = this.getContext().getInputVertexEdgeProperties();
        for (Map.Entry entry : inputs.entrySet()) {
            this.srcVertexInfo.put((String)entry.getKey(), this.createSourceVertexInfo((EdgeProperty)entry.getValue(), this.getContext().getVertexNumTasks(this.getContext().getVertexName())));
            this.getContext().registerForVertexStateUpdates((String)entry.getKey(), EnumSet.of(VertexState.CONFIGURED));
            if (((EdgeProperty)entry.getValue()).getDataMovementType() != EdgeProperty.DataMovementType.SCATTER_GATHER) continue;
            ++this.bipartiteSources;
        }
        this.onVertexStartedCheck();
        for (VertexStateUpdate stateUpdate : this.pendingStateUpdates) {
            this.handleVertexStateUpdate(stateUpdate);
        }
        this.pendingStateUpdates.clear();
        this.updatePendingTasks();
        for (VertexManagerEvent vmEvent : this.pendingVMEvents) {
            this.handleVertexManagerEvent(vmEvent);
        }
        this.pendingVMEvents.clear();
        LOG.info("OnVertexStarted vertex: {} with {} source tasks and {} pending tasks", new Object[]{this.getContext().getVertexName(), this.totalNumBipartiteSourceTasks, this.totalTasksToSchedule});
        if (completions != null) {
            for (TaskAttemptIdentifier attempt : completions) {
                this.onSourceTaskCompleted(attempt);
            }
        }
        this.onVertexStartedDone.set(true);
        this.processPendingTasks(null);
    }

    protected void onVertexStartedCheck() {
        if (this.bipartiteSources == 0) {
            throw new TezUncheckedException("At least 1 bipartite source should exist");
        }
    }

    public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
        BitSet completedSourceTasks;
        String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
        int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
        SourceVertexInfo srcInfo = this.srcVertexInfo.get(srcVertexName);
        if (srcInfo.vertexIsConfigured) {
            Preconditions.checkState((srcTaskId < srcInfo.numTasks ? 1 : 0) != 0, (Object)("Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName + " has only " + srcInfo.numTasks + " tasks"));
        }
        if (!(completedSourceTasks = srcInfo.finishedTaskSet).get(srcTaskId)) {
            completedSourceTasks.set(srcTaskId);
            if (srcInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                ++this.numBipartiteSourceTasksCompleted;
            }
        }
        this.processPendingTasks(attempt);
    }

    @VisibleForTesting
    void parsePartitionStats(SourceVertexInfo srcInfo, RoaringBitmap partitionStats) {
        Preconditions.checkState((srcInfo.statsInMB != null ? 1 : 0) != 0, (Object)"Stats should be initialized");
        Iterator it = partitionStats.iterator();
        DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
        int RANGE_LEN = RANGES.length;
        while (it.hasNext()) {
            int pos = (Integer)it.next();
            int index = pos / RANGE_LEN;
            int rangeIndex = pos % RANGE_LEN;
            if (RANGES[rangeIndex].getSizeInMB() <= 0) continue;
            int n = index;
            srcInfo.statsInMB[n] = srcInfo.statsInMB[n] + RANGES[rangeIndex].getSizeInMB();
        }
    }

    void parseDetailedPartitionStats(SourceVertexInfo srcInfo, List<Integer> partitionStats) {
        for (int i = 0; i < partitionStats.size(); ++i) {
            int n = i;
            srcInfo.statsInMB[n] = srcInfo.statsInMB[n] + partitionStats.get(i);
        }
    }

    public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        if (this.onVertexStartedDone.get()) {
            this.handleVertexManagerEvent(vmEvent);
        } else {
            this.pendingVMEvents.add(vmEvent);
        }
    }

    private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
        TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
        if (!this.taskWithVmEvents.add(producerTask)) {
            LOG.info("Ignoring vertex manager event from: {}", (Object)producerTask);
            return;
        }
        String vName = producerTask.getVertexIdentifier().getName();
        SourceVertexInfo srcInfo = this.srcVertexInfo.get(vName);
        Preconditions.checkState((srcInfo != null ? 1 : 0) != 0, (Object)("Unknown vmEvent from " + producerTask));
        ++this.numVertexManagerEventsReceived;
        long sourceTaskOutputSize = 0L;
        if (vmEvent.getUserPayload() != null) {
            ShuffleUserPayloads.VertexManagerEventPayloadProto proto;
            try {
                proto = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)vmEvent.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException((Throwable)e);
            }
            sourceTaskOutputSize = proto.getOutputSize();
            if (proto.hasPartitionStats()) {
                try {
                    RoaringBitmap partitionStats = new RoaringBitmap();
                    ByteString compressedPartitionStats = proto.getPartitionStats();
                    byte[] rawData = TezCommonUtils.decompressByteStringToByteArray((ByteString)compressedPartitionStats, (Inflater)this.inflater);
                    NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData);
                    partitionStats.deserialize((DataInput)new DataInputStream((InputStream)bin));
                    this.parsePartitionStats(srcInfo, partitionStats);
                }
                catch (IOException e) {
                    throw new TezUncheckedException((Throwable)e);
                }
            } else if (proto.hasDetailedPartitionStats()) {
                List<Integer> detailedPartitionStats = proto.getDetailedPartitionStats().getSizeInMbList();
                this.parseDetailedPartitionStats(srcInfo, detailedPartitionStats);
            }
            ++srcInfo.numVMEventsReceived;
            srcInfo.outputSize += sourceTaskOutputSize;
            this.completedSourceTasksOutputSize += sourceTaskOutputSize;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("For attempt: {} received info of output size: {} vertex numEventsReceived: {} vertex output size: {} total numEventsReceived: {} total output size: {}", new Object[]{vmEvent.getProducerAttemptIdentifier(), sourceTaskOutputSize, srcInfo.numVMEventsReceived, srcInfo.outputSize, this.numVertexManagerEventsReceived, this.completedSourceTasksOutputSize});
        }
    }

    void updatePendingTasks() {
        int tasks = this.getContext().getVertexNumTasks(this.getContext().getVertexName());
        if (tasks == this.pendingTasks.size() || tasks <= 0) {
            return;
        }
        this.pendingTasks.clear();
        for (int i = 0; i < tasks; ++i) {
            this.pendingTasks.add(new PendingTaskInfo(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    private ComputeRoutingAction getComputeRoutingAction(float minSourceVertexCompletedTaskFraction) {
        if (this.getNumOfTasksToSchedule(minSourceVertexCompletedTaskFraction) <= 0 && this.numBipartiteSourceTasksCompleted != this.totalNumBipartiteSourceTasks) {
            return ComputeRoutingAction.WAIT;
        }
        if (this.numVertexManagerEventsReceived == 0 && this.totalNumBipartiteSourceTasks > 0) {
            return ComputeRoutingAction.SKIP;
        }
        if (this.completedSourceTasksOutputSize < this.config.getDesiredTaskInputDataSize() && minSourceVertexCompletedTaskFraction < this.config.getMaxFraction()) {
            LOG.info("Defer scheduling tasks; vertex = {}, totalNumBipartiteSourceTasks = {}, completedSourceTasksOutputSize = {}, numVertexManagerEventsReceived = {}, numBipartiteSourceTasksCompleted = {}, minSourceVertexCompletedTaskFraction = {}", new Object[]{this.getContext().getVertexName(), this.totalNumBipartiteSourceTasks, this.completedSourceTasksOutputSize, this.numVertexManagerEventsReceived, this.numBipartiteSourceTasksCompleted, Float.valueOf(minSourceVertexCompletedTaskFraction)});
            return ComputeRoutingAction.WAIT;
        }
        return ComputeRoutingAction.COMPUTE;
    }

    BigInteger getExpectedTotalBipartiteSourceTasksOutputSize() {
        BigInteger expectedTotalSourceTasksOutputSize = BigInteger.ZERO;
        for (Map.Entry<String, SourceVertexInfo> vInfo : this.getBipartiteInfo()) {
            SourceVertexInfo srcInfo = vInfo.getValue();
            if (srcInfo.numTasks <= 0 || srcInfo.numVMEventsReceived <= 0) continue;
            BigInteger srcOutputSize = BigInteger.valueOf(srcInfo.outputSize);
            BigInteger srcNumTasks = BigInteger.valueOf(srcInfo.numTasks);
            BigInteger srcNumVMEventsReceived = BigInteger.valueOf(srcInfo.numVMEventsReceived);
            BigInteger expectedSrcOutputSize = srcOutputSize.multiply(srcNumTasks).divide(srcNumVMEventsReceived);
            expectedTotalSourceTasksOutputSize = expectedTotalSourceTasksOutputSize.add(expectedSrcOutputSize);
        }
        return expectedTotalSourceTasksOutputSize;
    }

    int getCurrentlyKnownStatsAtIndex(int index) {
        int stats = 0;
        for (SourceVertexInfo entry : this.getAllSourceVertexInfo()) {
            stats += entry.statsInMB[index];
        }
        return stats;
    }

    int getExpectedStatsAtIndex(int index) {
        int stats = 0;
        for (SourceVertexInfo entry : this.getAllSourceVertexInfo()) {
            stats += entry.getExpectedStatsInMBAtIndex(index);
        }
        return stats;
    }

    abstract ReconfigVertexParams computeRouting();

    abstract void postReconfigVertex();

    @VisibleForTesting
    boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) {
        if (this.computeRoutingAction.equals((Object)ComputeRoutingAction.WAIT)) {
            ReconfigVertexParams params;
            ComputeRoutingAction computeRoutingAction = this.getComputeRoutingAction(minSourceVertexCompletedTaskFraction);
            if (computeRoutingAction.equals((Object)ComputeRoutingAction.COMPUTE) && (params = this.computeRouting()) != null) {
                this.reconfigVertex(params.getFinalParallelism());
                this.updatePendingTasks();
                this.postReconfigVertex();
            }
            if (!computeRoutingAction.equals((Object)ComputeRoutingAction.WAIT)) {
                this.getContext().doneReconfiguringVertex();
            }
            this.computeRoutingAction = computeRoutingAction;
        }
        return this.computeRoutingAction.determined();
    }

    private boolean determineParallelismAndApply() {
        return this.determineParallelismAndApply(this.getMinSourceVertexCompletedTaskFraction());
    }

    abstract List<VertexManagerPluginContext.ScheduleTaskRequest> getTasksToSchedule(TaskAttemptIdentifier var1);

    abstract void processPendingTasks();

    private void schedulePendingTasks(TaskAttemptIdentifier completedSourceAttempt) {
        List<VertexManagerPluginContext.ScheduleTaskRequest> scheduledTasks = this.getTasksToSchedule(completedSourceAttempt);
        if (scheduledTasks != null && scheduledTasks.size() > 0) {
            this.getContext().scheduleTasks(scheduledTasks);
        }
    }

    Iterable<SourceVertexInfo> getAllSourceVertexInfo() {
        return this.srcVertexInfo.values();
    }

    SourceVertexInfo getSourceVertexInfo(String vertextName) {
        return this.srcVertexInfo.get(vertextName);
    }

    Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
        return Iterables.filter(this.srcVertexInfo.entrySet(), (Predicate)new Predicate<Map.Entry<String, SourceVertexInfo>>(){

            public boolean apply(Map.Entry<String, SourceVertexInfo> input) {
                return input.getValue().edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER;
            }
        });
    }

    private boolean canScheduleTasks() {
        for (Map.Entry<String, SourceVertexInfo> entry : this.srcVertexInfo.entrySet()) {
            if (entry.getValue().vertexIsConfigured) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Waiting for vertex: {} in vertex: {}", (Object)entry.getKey(), (Object)this.getContext().getVertexName());
            }
            return false;
        }
        this.sourceVerticesScheduled = true;
        return this.sourceVerticesScheduled;
    }

    int getNumOfTasksToScheduleAndLog(float minFraction) {
        int numTasksToSchedule = this.getNumOfTasksToSchedule(minFraction);
        if (numTasksToSchedule > 0) {
            LOG.info("Scheduling {} tasks for vertex: {} with totalTasks: {}. {} source tasks completed out of {}. MinSourceTaskCompletedFraction: {} min: {} max: {}", new Object[]{numTasksToSchedule, this.getContext().getVertexName(), this.totalTasksToSchedule, this.numBipartiteSourceTasksCompleted, this.totalNumBipartiteSourceTasks, Float.valueOf(minFraction), Float.valueOf(this.config.getMinFraction()), Float.valueOf(this.config.getMaxFraction())});
        }
        return numTasksToSchedule;
    }

    int getNumOfTasksToSchedule(float minSourceVertexCompletedTaskFraction) {
        int numPendingTasks = this.pendingTasks.size();
        if (this.numBipartiteSourceTasksCompleted == this.totalNumBipartiteSourceTasks) {
            LOG.info("All source tasks completed. Ramping up {} remaining tasks for vertex: {}", (Object)numPendingTasks, (Object)this.getContext().getVertexName());
            return numPendingTasks;
        }
        float tasksFractionToSchedule = 1.0f;
        float percentRange = this.config.getMaxFraction() - this.config.getMinFraction();
        if (percentRange > 0.0f) {
            tasksFractionToSchedule = (minSourceVertexCompletedTaskFraction - this.config.getMinFraction()) / percentRange;
        } else if (minSourceVertexCompletedTaskFraction < this.config.getMinFraction()) {
            tasksFractionToSchedule = 0.0f;
        }
        tasksFractionToSchedule = Math.max(0.0f, Math.min(1.0f, tasksFractionToSchedule));
        return (int)Math.ceil(tasksFractionToSchedule * (float)this.totalTasksToSchedule) - (this.totalTasksToSchedule - numPendingTasks);
    }

    float getMinSourceVertexCompletedTaskFraction() {
        float minSourceVertexCompletedTaskFraction = 1.0f;
        if (this.numBipartiteSourceTasksCompleted != this.totalNumBipartiteSourceTasks) {
            for (Map.Entry<String, SourceVertexInfo> vInfo : this.getBipartiteInfo()) {
                int numCompletedTasks;
                float completedFraction;
                SourceVertexInfo srcInfo = vInfo.getValue();
                Preconditions.checkState((boolean)srcInfo.vertexIsConfigured, (Object)("Vertex: " + vInfo.getKey()));
                if (srcInfo.numTasks <= 0 || !(minSourceVertexCompletedTaskFraction > (completedFraction = (float)(numCompletedTasks = srcInfo.getNumCompletedTasks()) / (float)srcInfo.numTasks))) continue;
                minSourceVertexCompletedTaskFraction = completedFraction;
            }
        }
        return minSourceVertexCompletedTaskFraction;
    }

    private boolean preconditionsSatisfied() {
        if (!this.onVertexStartedDone.get()) {
            return false;
        }
        if (!this.sourceVerticesScheduled && !this.canScheduleTasks()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Defer scheduling tasks for vertex: {} as one task needs to be completed per source vertex", (Object)this.getContext().getVertexName());
            }
            return false;
        }
        return true;
    }

    private void processPendingTasks(TaskAttemptIdentifier completedSourceAttempt) {
        if (!this.preconditionsSatisfied()) {
            return;
        }
        if (this.config.isAutoParallelismEnabled() && !this.determineParallelismAndApply()) {
            return;
        }
        this.processPendingTasks();
        this.schedulePendingTasks(completedSourceAttempt);
    }

    abstract ShuffleVertexManagerBaseConfig initConfiguration();

    public void initialize() {
        try {
            this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
        this.config = this.initConfiguration();
        this.updatePendingTasks();
        if (this.config.isAutoParallelismEnabled()) {
            this.getContext().vertexReconfigurationPlanned();
        }
    }

    private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) {
        Preconditions.checkArgument((stateUpdate.getVertexState() == VertexState.CONFIGURED ? 1 : 0) != 0, (Object)("Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: " + stateUpdate.getVertexName() + " in vertex: " + this.getContext().getVertexName()));
        Preconditions.checkArgument((boolean)this.srcVertexInfo.containsKey(stateUpdate.getVertexName()), (Object)("Received incorrect vertex notification : " + stateUpdate.getVertexState() + " for vertex: " + stateUpdate.getVertexName() + " in vertex: " + this.getContext().getVertexName()));
        SourceVertexInfo vInfo = this.srcVertexInfo.get(stateUpdate.getVertexName());
        Preconditions.checkState((!vInfo.vertexIsConfigured ? 1 : 0) != 0);
        vInfo.vertexIsConfigured = true;
        vInfo.numTasks = this.getContext().getVertexNumTasks(stateUpdate.getVertexName());
        if (vInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
            this.totalNumBipartiteSourceTasks += vInfo.numTasks;
        }
        LOG.info("Received configured notification : {} for vertex: {} in vertex: {} numBipartiteSourceTasks: {}", new Object[]{stateUpdate.getVertexState(), stateUpdate.getVertexName(), this.getContext().getVertexName(), this.totalNumBipartiteSourceTasks});
        this.processPendingTasks(null);
    }

    public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
        if (stateUpdate.getVertexState() == VertexState.CONFIGURED) {
            if (this.onVertexStartedDone.get()) {
                this.handleVertexStateUpdate(stateUpdate);
            } else {
                this.pendingStateUpdates.add(stateUpdate);
            }
        }
    }

    public synchronized void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
    }

    private void reconfigVertex(int finalTaskParallelism) {
        HashMap<String, EdgeProperty> edgeProperties = new HashMap<String, EdgeProperty>(this.bipartiteSources);
        Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = this.getBipartiteInfo();
        for (Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
            String vertex = entry.getKey();
            EdgeProperty oldEdgeProp = entry.getValue().edgeProperty;
            EdgeProperty newEdgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)entry.getValue().newDescriptor, (EdgeProperty.DataSourceType)oldEdgeProp.getDataSourceType(), (EdgeProperty.SchedulingType)oldEdgeProp.getSchedulingType(), (OutputDescriptor)oldEdgeProp.getEdgeSource(), (InputDescriptor)oldEdgeProp.getEdgeDestination());
            edgeProperties.put(vertex, newEdgeProp);
        }
        this.getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties);
    }

    static class ShuffleVertexManagerBaseConfig {
        private final boolean enableAutoParallelism;
        private final long desiredTaskInputDataSize;
        private final float slowStartMinFraction;
        private final float slowStartMaxFraction;

        public ShuffleVertexManagerBaseConfig(boolean enableAutoParallelism, long desiredTaskInputDataSize, float slowStartMinFraction, float slowStartMaxFraction) {
            if (slowStartMinFraction < 0.0f || slowStartMaxFraction > 1.0f || slowStartMaxFraction < slowStartMinFraction) {
                throw new IllegalArgumentException("Invalid values for slowStartMinFraction/slowStartMaxFraction. Min cannot be < 0, max cannot be > 1, and max cannot be < min., configuredMin=" + slowStartMinFraction + ", configuredMax=" + slowStartMaxFraction);
            }
            this.enableAutoParallelism = enableAutoParallelism;
            this.desiredTaskInputDataSize = desiredTaskInputDataSize;
            this.slowStartMinFraction = slowStartMinFraction;
            this.slowStartMaxFraction = slowStartMaxFraction;
            LOG.info("Settings minFrac: {} maxFrac: {} auto: {} desiredTaskIput: {}", new Object[]{Float.valueOf(slowStartMinFraction), Float.valueOf(slowStartMaxFraction), enableAutoParallelism, desiredTaskInputDataSize});
        }

        public boolean isAutoParallelismEnabled() {
            return this.enableAutoParallelism;
        }

        public long getDesiredTaskInputDataSize() {
            return this.desiredTaskInputDataSize;
        }

        public float getMinFraction() {
            return this.slowStartMinFraction;
        }

        public float getMaxFraction() {
            return this.slowStartMaxFraction;
        }
    }

    static class ReconfigVertexParams {
        private final int finalParallelism;
        private final VertexLocationHint locationHint;

        public ReconfigVertexParams(int finalParallelism, VertexLocationHint locationHint) {
            this.finalParallelism = finalParallelism;
            this.locationHint = locationHint;
        }

        public int getFinalParallelism() {
            return this.finalParallelism;
        }

        public VertexLocationHint getLocationHint() {
            return this.locationHint;
        }
    }

    static class PendingTaskInfo {
        private final int index;
        private int inputStats;

        public PendingTaskInfo(int index) {
            this.index = index;
        }

        public String toString() {
            return "[index=" + this.index + ", inputStats=" + this.inputStats + "]";
        }

        public int getIndex() {
            return this.index;
        }

        public int getInputStats() {
            return this.inputStats;
        }

        public boolean setInputStats(int inputStats) {
            if (inputStats > 0 && this.inputStats != inputStats) {
                this.inputStats = inputStats;
                return true;
            }
            return false;
        }
    }

    static class SourceVertexInfo {
        final EdgeProperty edgeProperty;
        boolean vertexIsConfigured;
        final BitSet finishedTaskSet;
        int numTasks;
        int numVMEventsReceived;
        long outputSize;
        int[] statsInMB;
        EdgeManagerPluginDescriptor newDescriptor;

        SourceVertexInfo(EdgeProperty edgeProperty, int totalTasksToSchedule) {
            this.edgeProperty = edgeProperty;
            this.finishedTaskSet = new BitSet();
            this.statsInMB = new int[totalTasksToSchedule];
        }

        int getNumTasks() {
            return this.numTasks;
        }

        int getNumCompletedTasks() {
            return this.finishedTaskSet.cardinality();
        }

        int getExpectedStatsInMBAtIndex(int index) {
            return this.numVMEventsReceived == 0 ? 0 : this.statsInMB[index] * this.numTasks / this.numVMEventsReceived;
        }
    }

    static enum ComputeRoutingAction {
        WAIT,
        SKIP,
        COMPUTE;


        public boolean determined() {
            return this != WAIT;
        }
    }
}

