/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShuffleScheduler {
    static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>(){

        @Override
        protected Long initialValue() {
            return 0L;
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
    static final long INITIAL_PENALTY = 2000L;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private final BitSet finishedMaps;
    private final int numInputs;
    private final String srcNameTrimmed;
    private int numFetchedSpills;
    private Map<MapHost.HostPortPartition, MapHost> mapLocations = new HashMap<MapHost.HostPortPartition, MapHost>();
    @VisibleForTesting
    final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>();
    @VisibleForTesting
    final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
    @VisibleForTesting
    final Set<MapHost> pendingHosts = new HashSet<MapHost>();
    private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
    private final Random random = new Random(System.currentTimeMillis());
    private final DelayQueue<Penalty> penalties = new DelayQueue();
    private final Referee referee;
    @VisibleForTesting
    final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier, IntWritable>();
    final Set<MapHost.HostPort> uniqueHosts = Sets.newHashSet();
    private final Map<MapHost.HostPort, IntWritable> hostFailures = new HashMap<MapHost.HostPort, IntWritable>();
    private final InputContext inputContext;
    private final Shuffle shuffle;
    private final TezCounter shuffledInputsCounter;
    private final TezCounter skippedInputCounter;
    private final TezCounter reduceShuffleBytes;
    private final TezCounter reduceBytesDecompressed;
    @VisibleForTesting
    final TezCounter failedShuffleCounter;
    private final TezCounter bytesShuffledToDisk;
    private final TezCounter bytesShuffledToDiskDirect;
    private final TezCounter bytesShuffledToMem;
    private final TezCounter firstEventReceived;
    private final TezCounter lastEventReceived;
    @VisibleForTesting
    final AtomicInteger remainingMaps;
    private final long startTime;
    @VisibleForTesting
    long lastProgressTime;
    @VisibleForTesting
    long failedShufflesSinceLastCompletion;
    private int maxTaskOutputAtOnce;
    private int maxFetchFailuresBeforeReporting;
    private boolean reportReadErrorImmediately = true;
    private int maxFailedUniqueFetches = 5;
    private final int abortFailureLimit;
    private final int minFailurePerHost;
    private final float hostFailureFraction;
    private final float maxStallTimeFraction;
    private final float minReqProgressFraction;
    private final float maxAllowedFailedFetchFraction;
    private final boolean checkFailedFetchSinceLastCompletion;
    private long totalBytesShuffledTillNow = 0L;
    private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);

    public ShuffleScheduler(InputContext inputContext, Configuration conf, int numberOfInputs, Shuffle shuffle, TezCounter shuffledInputsCounter, TezCounter reduceShuffleBytes, TezCounter reduceBytesDecompressed, TezCounter failedShuffleCounter, TezCounter bytesShuffledToDisk, TezCounter bytesShuffledToDiskDirect, TezCounter bytesShuffledToMem, long startTime, String srcNameTrimmed) {
        this.inputContext = inputContext;
        this.numInputs = numberOfInputs;
        int abortFailureLimitConf = conf.getInt("tez.runtime.shuffle.src-attempt.abort.limit", -1);
        this.abortFailureLimit = abortFailureLimitConf <= -1 ? Math.max(15, numberOfInputs / 10) : abortFailureLimitConf;
        this.remainingMaps = new AtomicInteger(numberOfInputs);
        this.finishedMaps = new BitSet(this.remainingMaps.get());
        this.minFailurePerHost = conf.getInt("tez.runtime.shuffle.min.failures.per.host", 4);
        Preconditions.checkArgument((this.minFailurePerHost >= 0 ? 1 : 0) != 0, (Object)("tez.runtime.shuffle.min.failures.per.host=" + this.minFailurePerHost + " should not be negative"));
        this.hostFailureFraction = conf.getFloat("tez.runtime.shuffle.acceptable.host-fetch.failure.fraction", 0.2f);
        this.maxStallTimeFraction = conf.getFloat("tez.runtime.shuffle.max.stall.time.fraction", 0.5f);
        Preconditions.checkArgument((this.maxStallTimeFraction >= 0.0f ? 1 : 0) != 0, (Object)("tez.runtime.shuffle.max.stall.time.fraction=" + this.maxStallTimeFraction + " should not be negative"));
        this.minReqProgressFraction = conf.getFloat("tez.runtime.shuffle.min.required.progress.fraction", 0.5f);
        Preconditions.checkArgument((this.minReqProgressFraction >= 0.0f ? 1 : 0) != 0, (Object)("tez.runtime.shuffle.min.required.progress.fraction=" + this.minReqProgressFraction + " should not be negative"));
        this.maxAllowedFailedFetchFraction = conf.getFloat("tez.runtime.shuffle.max.allowed.failed.fetch.fraction", 0.5f);
        Preconditions.checkArgument((this.maxAllowedFailedFetchFraction >= 0.0f ? 1 : 0) != 0, (Object)("tez.runtime.shuffle.max.allowed.failed.fetch.fraction=" + this.maxAllowedFailedFetchFraction + " should not be negative"));
        this.checkFailedFetchSinceLastCompletion = conf.getBoolean("tez.runtime.shuffle.failed.check.since-last.completion", true);
        this.srcNameTrimmed = srcNameTrimmed;
        this.referee = new Referee();
        this.shuffle = shuffle;
        this.shuffledInputsCounter = shuffledInputsCounter;
        this.reduceShuffleBytes = reduceShuffleBytes;
        this.reduceBytesDecompressed = reduceBytesDecompressed;
        this.failedShuffleCounter = failedShuffleCounter;
        this.bytesShuffledToDisk = bytesShuffledToDisk;
        this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
        this.bytesShuffledToMem = bytesShuffledToMem;
        this.startTime = startTime;
        this.lastProgressTime = startTime;
        this.maxFailedUniqueFetches = Math.min(numberOfInputs, this.maxFailedUniqueFetches);
        this.referee.start();
        this.maxFetchFailuresBeforeReporting = conf.getInt("tez.runtime.shuffle.fetch.failures.limit", 5);
        this.reportReadErrorImmediately = conf.getBoolean("tez.runtime.shuffle.notify.readerror", true);
        this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt("tez.runtime.shuffle.fetch.max.task.output.at.once", 20)));
        this.skippedInputCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SKIPPED_INPUTS);
        this.firstEventReceived = inputContext.getCounters().findCounter((Enum)TaskCounter.FIRST_EVENT_RECEIVED);
        this.lastEventReceived = inputContext.getCounters().findCounter((Enum)TaskCounter.LAST_EVENT_RECEIVED);
        this.pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
        LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + this.maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + this.reportReadErrorImmediately + ", maxFailedUniqueFetches=" + this.maxFailedUniqueFetches + ", abortFailureLimit=" + this.abortFailureLimit + ", hostFailureFraction=" + this.hostFailureFraction + ", minFailurePerHost=" + this.minFailurePerHost + ", maxAllowedFailedFetchFraction=" + this.maxAllowedFailedFetchFraction + ", maxStallTimeFraction=" + this.maxStallTimeFraction + ", minReqProgressFraction=" + this.minReqProgressFraction + ", checkFailedFetchSinceLastCompletion=" + this.checkFailedFetchSinceLastCompletion + ", maxTaskOutputAtOnce=" + this.maxTaskOutputAtOnce);
    }

    protected synchronized void updateEventReceivedTime() {
        long relativeTime = System.currentTimeMillis() - this.startTime;
        if (this.firstEventReceived.getValue() == 0L) {
            this.firstEventReceived.setValue(relativeTime);
            this.lastEventReceived.setValue(relativeTime);
            return;
        }
        this.lastEventReceived.setValue(relativeTime);
    }

    public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, MapHost host, long bytesCompressed, long bytesDecompressed, long millis, MapOutput output, boolean isLocalFetch) throws IOException {
        this.inputContext.notifyProgress();
        if (!this.isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
            if (!isLocalFetch) {
                this.failedShufflesSinceLastCompletion = 0L;
            }
            if (output != null) {
                this.failureCounts.remove(srcAttemptIdentifier);
                if (host != null) {
                    this.hostFailures.remove(new MapHost.HostPort(host.getHost(), host.getPort()));
                }
                output.commit();
                ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed, bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
                if (output.getType() == MapOutput.Type.DISK) {
                    this.bytesShuffledToDisk.increment(bytesCompressed);
                } else if (output.getType() == MapOutput.Type.DISK_DIRECT) {
                    this.bytesShuffledToDiskDirect.increment(bytesCompressed);
                } else {
                    this.bytesShuffledToMem.increment(bytesCompressed);
                }
                this.shuffledInputsCounter.increment(1L);
            } else {
                this.skippedInputCounter.increment(1L);
            }
            if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
                this.remainingMaps.decrementAndGet();
                this.setInputFinished(srcAttemptIdentifier.getInputIdentifier());
                ++this.numFetchedSpills;
            } else {
                int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
                if (!this.validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
                    return;
                }
                ShuffleEventInfo eventInfo = this.pipelinedShuffleInfoEventsMap.get(inputIdentifier);
                if (eventInfo == null && output == null) {
                    eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
                    this.pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
                }
                assert (eventInfo != null);
                eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
                ++this.numFetchedSpills;
                if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
                    eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
                }
                if (eventInfo.isDone()) {
                    this.remainingMaps.decrementAndGet();
                    this.setInputFinished(inputIdentifier);
                    this.pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + this.pipelinedShuffleInfoEventsMap);
                    }
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("eventInfo " + eventInfo.toString());
                }
            }
            if (this.remainingMaps.get() == 0) {
                LOG.info(this.srcNameTrimmed + ": " + "All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName());
                this.notifyAll();
            }
            this.lastProgressTime = System.currentTimeMillis();
            this.totalBytesShuffledTillNow += bytesCompressed;
            this.logProgress();
            this.reduceShuffleBytes.increment(bytesCompressed);
            this.reduceBytesDecompressed.increment(bytesDecompressed);
            if (LOG.isDebugEnabled()) {
                LOG.debug("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(), srcAttemptIdentifier.getAttemptNumber()) + " done");
            }
        } else {
            LOG.warn(this.srcNameTrimmed + ": Duplicate fetch of input " + "no longer needs to be fetched: " + srcAttemptIdentifier);
            if (output != null) {
                output.abort();
            }
        }
    }

    private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) {
        if (input.canRetrieveInputInChunks()) {
            ShuffleEventInfo eventInfo = this.pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier());
            if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) {
                this.reportExceptionForInput(new IOException("Previous event already got scheduled for " + input + ". Previous attempt's data could have been already merged " + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum=" + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum=" + input.getAttemptNumber()));
                return false;
            }
            if (eventInfo == null) {
                this.pipelinedShuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input));
            }
        }
        return true;
    }

    @VisibleForTesting
    void reportExceptionForInput(Exception exception) {
        LOG.error(this.srcNameTrimmed + ": " + "Reporting exception for input", (Throwable)exception);
        this.shuffle.reportException(exception);
    }

    private void logProgress() {
        int inputsDone = this.numInputs - this.remainingMaps.get();
        if (inputsDone > this.nextProgressLineEventCount.get() || inputsDone == this.numInputs) {
            this.nextProgressLineEventCount.addAndGet(50);
            double mbs = (double)this.totalBytesShuffledTillNow / 1048576.0;
            long secsSinceStart = (System.currentTimeMillis() - this.startTime) / 1000L + 1L;
            double transferRate = mbs / (double)secsSinceStart;
            LOG.info("copy(" + inputsDone + " (spillsFetched=" + this.numFetchedSpills + ") of " + this.numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + this.mbpsFormat.format(transferRate) + " MB/s)");
        }
    }

    public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, MapHost host, boolean readError, boolean connectError, boolean isLocalFetch) {
        boolean shouldInformAM;
        this.failedShuffleCounter.increment(1L);
        this.inputContext.notifyProgress();
        int failures = this.incrementAndGetFailureAttempt(srcAttempt);
        if (!isLocalFetch) {
            ++this.failedShufflesSinceLastCompletion;
        }
        boolean bl = shouldInformAM = this.reportReadErrorImmediately && (readError || connectError) || failures % this.maxFetchFailuresBeforeReporting == 0;
        if (shouldInformAM) {
            this.informAM(srcAttempt);
        }
        if (!this.isShuffleHealthy(srcAttempt)) {
            return;
        }
        this.penalizeHost(host, failures);
    }

    private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
        int attemptFailures = this.getFailureCount(srcAttempt);
        if (attemptFailures >= this.abortFailureLimit) {
            String errorMsg = "Failed " + attemptFailures + " times trying to " + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + ". threshold=" + this.abortFailureLimit;
            IOException ioe = new IOException(errorMsg);
            this.shuffle.reportException(ioe);
            return true;
        }
        return false;
    }

    private void penalizeHost(MapHost host, int failures) {
        host.penalize();
        MapHost.HostPort hostPort = new MapHost.HostPort(host.getHost(), host.getPort());
        if (this.hostFailures.containsKey(hostPort)) {
            IntWritable x = this.hostFailures.get(hostPort);
            x.set(x.get() + 1);
        } else {
            this.hostFailures.put(hostPort, new IntWritable(1));
        }
        long delay = (long)(2000.0 * Math.pow(1.3f, failures));
        this.penalties.add(new Penalty(host, delay));
    }

    private int getFailureCount(InputAttemptIdentifier srcAttempt) {
        IntWritable failureCount = this.failureCounts.get(srcAttempt);
        return failureCount == null ? 0 : failureCount.get();
    }

    private int incrementAndGetFailureAttempt(InputAttemptIdentifier srcAttempt) {
        int failures = 1;
        if (this.failureCounts.containsKey(srcAttempt)) {
            IntWritable x = this.failureCounts.get(srcAttempt);
            x.set(x.get() + 1);
            failures = x.get();
        } else {
            this.failureCounts.put(srcAttempt, new IntWritable(1));
        }
        return failures;
    }

    public void reportLocalError(IOException ioe) {
        LOG.error(this.srcNameTrimmed + ": " + "Shuffle failed : caused by local error", (Throwable)ioe);
        this.shuffle.reportException(ioe);
    }

    private void informAM(InputAttemptIdentifier srcAttempt) {
        LOG.info(this.srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to AM.");
        ArrayList failedEvents = Lists.newArrayListWithCapacity((int)1);
        failedEvents.add(InputReadErrorEvent.create((String)("Fetch failure for " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier(), srcAttempt.getAttemptNumber()) + " to jobtracker."), (int)srcAttempt.getInputIdentifier(), (int)srcAttempt.getAttemptNumber()));
        this.inputContext.sendEvents((List)failedEvents);
    }

    private boolean hasFailedAcrossNodes(String logContext) {
        MapHost.HostPort host;
        IntWritable failures;
        int numUniqueHosts = this.uniqueHosts.size();
        Preconditions.checkArgument((numUniqueHosts > 0 ? 1 : 0) != 0, (Object)"No values in unique hosts");
        int threshold = Math.max(3, (int)Math.ceil((float)numUniqueHosts * this.hostFailureFraction));
        int total = 0;
        boolean failedAcrossNodes = false;
        Iterator<MapHost.HostPort> i$ = this.uniqueHosts.iterator();
        while (i$.hasNext() && ((failures = this.hostFailures.get(host = i$.next())) == null || failures.get() <= this.minFailurePerHost || !(failedAcrossNodes = ++total > threshold * this.minFailurePerHost))) {
        }
        LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts + ", hostFailureThreshold=" + threshold + ", hostFailuresCount=" + this.hostFailures.size() + ", hosts crossing threshold=" + total + ", reducerFetchIssues=" + failedAcrossNodes);
        return failedAcrossNodes;
    }

    private boolean allEventsReceived() {
        if (!this.pipelinedShuffleInfoEventsMap.isEmpty()) {
            return this.pipelinedShuffleInfoEventsMap.size() == this.numInputs;
        }
        return (long)this.pathToIdentifierMap.size() + this.skippedInputCounter.getValue() == (long)this.numInputs;
    }

    private boolean isFetcherHealthy(String logContext) {
        long totalFailures = this.failedShuffleCounter.getValue();
        int doneMaps = this.numInputs - this.remainingMaps.get();
        boolean fetcherHealthy = true;
        if (doneMaps > 0) {
            boolean bl = fetcherHealthy = (float)totalFailures / (float)(totalFailures + (long)doneMaps) < this.maxAllowedFailedFetchFraction;
        }
        if (fetcherHealthy && this.allEventsReceived()) {
            boolean failedAcrossNodes;
            if (this.hostFailureFraction > 0.0f && (failedAcrossNodes = this.hasFailedAcrossNodes(logContext))) {
                return false;
            }
            if (this.checkFailedFetchSinceLastCompletion && this.failedShufflesSinceLastCompletion >= (long)(this.remainingMaps.get() * this.minFailurePerHost)) {
                fetcherHealthy = (float)this.failedShufflesSinceLastCompletion / (float)(this.failedShufflesSinceLastCompletion + (long)this.remainingMaps.get()) < this.maxAllowedFailedFetchFraction;
                LOG.info(logContext + ", fetcherHealthy=" + fetcherHealthy + ", failedShufflesSinceLastCompletion=" + this.failedShufflesSinceLastCompletion + ", remainingMaps=" + this.remainingMaps.get());
            }
        }
        return fetcherHealthy;
    }

    boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
        boolean reducerStalled;
        if (this.isAbortLimitExceeedFor(srcAttempt)) {
            return false;
        }
        float MIN_REQUIRED_PROGRESS_PERCENT = this.minReqProgressFraction;
        float MAX_ALLOWED_STALL_TIME_PERCENT = this.maxStallTimeFraction;
        int doneMaps = this.numInputs - this.remainingMaps.get();
        String logContext = "srcAttempt=" + srcAttempt.toString();
        boolean fetcherHealthy = this.isFetcherHealthy(logContext);
        boolean reducerProgressedEnough = (float)doneMaps / (float)this.numInputs >= MIN_REQUIRED_PROGRESS_PERCENT;
        int stallDuration = (int)(System.currentTimeMillis() - this.lastProgressTime);
        int shuffleProgressDuration = (int)(this.lastProgressTime - this.startTime);
        boolean bl = reducerStalled = shuffleProgressDuration > 0 && (float)stallDuration / (float)shuffleProgressDuration >= MAX_ALLOWED_STALL_TIME_PERCENT;
        if (!(this.failureCounts.size() < this.maxFailedUniqueFetches && this.failureCounts.size() != this.numInputs - doneMaps || fetcherHealthy || reducerProgressedEnough && !reducerStalled)) {
            String errorMsg = this.srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures and insufficient progress!" + "failureCounts=" + this.failureCounts.size() + ", pendingInputs=" + (this.numInputs - doneMaps) + ", fetcherHealthy=" + fetcherHealthy + ", reducerProgressedEnough=" + reducerProgressedEnough + ", reducerStalled=" + reducerStalled;
            LOG.error(errorMsg);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Host failures=" + this.hostFailures.keySet());
            }
            this.shuffle.reportException(new IOException(errorMsg));
            return false;
        }
        return true;
    }

    public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, InputAttemptIdentifier srcAttempt) {
        this.uniqueHosts.add(new MapHost.HostPort(inputHostName, port));
        MapHost.HostPortPartition identifier = new MapHost.HostPortPartition(inputHostName, port, partitionId);
        MapHost host = this.mapLocations.get(identifier);
        if (host == null) {
            host = new MapHost(inputHostName, port, partitionId);
            this.mapLocations.put(identifier, host);
        }
        if (!this.validateInputAttemptForPipelinedShuffle(srcAttempt)) {
            return;
        }
        host.addKnownMap(srcAttempt);
        this.pathToIdentifierMap.put(this.getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
        if (host.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
    }

    public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
        LOG.info(this.srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
        if (this.pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
            this.shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " + "to memory/disk outputs.  Failing the fetch early."));
            return;
        }
        this.obsoleteInputs.add(srcAttempt);
    }

    public synchronized void putBackKnownMapOutput(MapHost host, InputAttemptIdentifier srcAttempt) {
        host.addKnownMap(srcAttempt);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("PendingHosts=" + this.pendingHosts);
            }
            this.wait();
        }
        MapHost host = null;
        Iterator<MapHost> iter = this.pendingHosts.iterator();
        int numToPick = this.random.nextInt(this.pendingHosts.size());
        for (int i = 0; i <= numToPick; ++i) {
            host = iter.next();
        }
        this.pendingHosts.remove(host);
        host.markBusy();
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.srcNameTrimmed + ": " + "Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName());
        }
        shuffleStart.set(System.currentTimeMillis());
        return host;
    }

    public InputAttemptIdentifier getIdentifierForFetchedOutput(String path, int reduceId) {
        return (InputAttemptIdentifier)this.pathToIdentifierMap.get(this.getIdentifierFromPathAndReduceId(path, reduceId));
    }

    private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
        return !this.obsoleteInputs.contains(id) && !this.isInputFinished(id.getInputIdentifier());
    }

    public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
        List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
        LinkedListMultimap dedupedList = LinkedListMultimap.create();
        for (InputAttemptIdentifier id : origList) {
            if (this.inputShouldBeConsumed(id)) {
                Integer inputNumber = id.getInputIdentifier();
                List oldIdList = dedupedList.get((Object)inputNumber);
                if (oldIdList == null || oldIdList.isEmpty()) {
                    dedupedList.put((Object)inputNumber, (Object)id);
                    continue;
                }
                boolean addIdentifierToList = false;
                Iterator oldIdIterator = oldIdList.iterator();
                while (oldIdIterator.hasNext()) {
                    InputAttemptIdentifier oldId = (InputAttemptIdentifier)oldIdIterator.next();
                    if (id.canRetrieveInputInChunks()) {
                        if (oldId.getSpillEventId() == id.getSpillEventId()) {
                            addIdentifierToList = false;
                            continue;
                        }
                        if (oldId.getAttemptNumber() == id.getAttemptNumber()) {
                            addIdentifierToList = true;
                            break;
                        }
                    }
                    if (oldId.getAttemptNumber() >= id.getAttemptNumber()) continue;
                    oldIdIterator.remove();
                    LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: " + oldId.getAttemptNumber() + " was not determined to be invalid. Ignoring it for now in favour of " + id.getAttemptNumber());
                    addIdentifierToList = true;
                    break;
                }
                if (!addIdentifierToList) continue;
                dedupedList.put((Object)inputNumber, (Object)id);
                continue;
            }
            LOG.info("Ignoring finished or obsolete source: " + id);
        }
        ArrayList<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
        int includedMaps = 0;
        int totalSize = dedupedList.size();
        for (Integer inputIndex : dedupedList.keySet()) {
            List attemptIdentifiers = dedupedList.get((Object)inputIndex);
            for (InputAttemptIdentifier inputAttemptIdentifier : attemptIdentifiers) {
                if (includedMaps++ >= this.maxTaskOutputAtOnce) {
                    host.addKnownMap(inputAttemptIdentifier);
                    continue;
                }
                result.add(inputAttemptIdentifier);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " + host + " to " + Thread.currentThread().getName());
        }
        return result;
    }

    public synchronized void freeHost(MapHost host) {
        if (host.getState() != MapHost.State.PENALIZED && host.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(host + " freed by " + Thread.currentThread().getName() + " in " + (System.currentTimeMillis() - shuffleStart.get()) + "ms");
        }
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteInputs.clear();
        this.pendingHosts.clear();
        this.pathToIdentifierMap.clear();
    }

    public synchronized boolean isDone() {
        return this.remainingMaps.get() == 0;
    }

    public synchronized boolean waitUntilDone(int millis) throws InterruptedException {
        if (this.remainingMaps.get() > 0) {
            this.wait(millis);
            return this.remainingMaps.get() == 0;
        }
        return true;
    }

    private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) {
        return new PathPartition(path, reduceId);
    }

    public void close() throws InterruptedException {
        this.logProgress();
        this.referee.interrupt();
        this.referee.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setInputFinished(int inputIndex) {
        BitSet bitSet = this.finishedMaps;
        synchronized (bitSet) {
            this.finishedMaps.set(inputIndex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isInputFinished(int inputIndex) {
        BitSet bitSet = this.finishedMaps;
        synchronized (bitSet) {
            return this.finishedMaps.get(inputIndex);
        }
    }

    private class Referee
    extends Thread {
        public Referee() {
            this.setName("ShufflePenaltyReferee {" + TezUtilsInternal.cleanVertexName((String)ShuffleScheduler.this.inputContext.getSourceVertexName()) + "}");
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (true) {
                    MapHost host = ((Penalty)((ShuffleScheduler)ShuffleScheduler.this).penalties.take()).host;
                    ShuffleScheduler shuffleScheduler = ShuffleScheduler.this;
                    synchronized (shuffleScheduler) {
                        if (host.markAvailable() == MapHost.State.PENDING) {
                            ShuffleScheduler.this.pendingHosts.add(host);
                            ShuffleScheduler.this.notifyAll();
                        }
                    }
                }
            }
            catch (InterruptedException ie) {
                return;
            }
            catch (Throwable t) {
                ShuffleScheduler.this.shuffle.reportException(t);
                return;
            }
        }
    }

    private static class Penalty
    implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost host, long delay) {
            this.host = host;
            this.endTime = System.currentTimeMillis() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long remainingTime = this.endTime - System.currentTimeMillis();
            return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long other = ((Penalty)o).endTime;
            return this.endTime == other ? 0 : (this.endTime < other ? -1 : 1);
        }
    }

    static class ShuffleEventInfo {
        BitSet eventsProcessed;
        int finalEventId = -1;
        int attemptNum;
        String id;

        ShuffleEventInfo(InputAttemptIdentifier input) {
            this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber();
            this.eventsProcessed = new BitSet();
            this.attemptNum = input.getAttemptNumber();
        }

        void spillProcessed(int spillId) {
            if (this.finalEventId != -1) {
                Preconditions.checkState((this.eventsProcessed.cardinality() <= this.finalEventId + 1 ? 1 : 0) != 0, (Object)("Wrong state. eventsProcessed cardinality=" + this.eventsProcessed.cardinality() + " " + "finalEventId=" + this.finalEventId + ", spillId=" + spillId + ", " + this.toString()));
            }
            this.eventsProcessed.set(spillId);
        }

        void setFinalEventId(int spillId) {
            this.finalEventId = spillId;
        }

        boolean isDone() {
            return this.finalEventId != -1 && this.finalEventId + 1 == this.eventsProcessed.cardinality();
        }

        public String toString() {
            return "[eventsProcessed=" + this.eventsProcessed + ", finalEventId=" + this.finalEventId + ", id=" + this.id + ", attemptNum=" + this.attemptNum + "]";
        }
    }

    public static class PathPartition {
        final String path;
        final int partition;

        PathPartition(String path, int partition) {
            this.path = path;
            this.partition = partition;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.path == null ? 0 : this.path.hashCode());
            result = 31 * result + this.partition;
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            PathPartition other = (PathPartition)obj;
            if (this.path == null ? other.path != null : !this.path.equals(other.path)) {
                return false;
            }
            return this.partition == other.partition;
        }

        public String toString() {
            return "PathPartition [path=" + this.path + ", partition=" + this.partition + "]";
        }
    }
}

