/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.crypto.SecretKey;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.TaskContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleClientMetrics;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleInputEventHandlerOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Shuffle
implements ExceptionReporter {
    private static final Logger LOG = LoggerFactory.getLogger(Shuffle.class);
    private static final int PROGRESS_FREQUENCY = 2000;
    private final Configuration conf;
    private final InputContext inputContext;
    private final ShuffleClientMetrics metrics;
    private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
    private final ShuffleScheduler scheduler;
    private final MergeManager merger;
    private final SecretKey jobTokenSecret;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final int numFetchers;
    private final boolean localDiskFetchEnabled;
    private final String localHostname;
    private final int shufflePort;
    private AtomicReference<Throwable> throwable = new AtomicReference();
    private String throwingThreadName = null;
    private final RunShuffleCallable runShuffleCallable;
    private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
    private final ListeningExecutorService executor;
    private final String srcNameTrimmed;
    private final List<FetcherOrderedGrouped> fetchers;
    private final HttpConnection.HttpConnectionParams httpConnectionParams;
    private AtomicBoolean isShutDown = new AtomicBoolean(false);
    private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
    private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
    private AtomicBoolean mergerClosed = new AtomicBoolean(false);
    private final long startTime;
    private final TezCounter mergePhaseTime;
    private final TezCounter shufflePhaseTime;

    public Shuffle(InputContext inputContext, Configuration conf, int numInputs, long initialMemoryAvailable) throws IOException {
        this.inputContext = inputContext;
        this.conf = conf;
        this.httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams(conf);
        this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(), inputContext.getTaskVertexName(), inputContext.getTaskIndex(), this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
        this.srcNameTrimmed = TezUtilsInternal.cleanVertexName((String)inputContext.getSourceVertexName());
        this.jobTokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData("mapreduce_shuffle"));
        this.jobTokenSecretMgr = new JobTokenSecretManager(this.jobTokenSecret);
        if (ConfigUtils.isIntermediateInputCompressed(conf)) {
            Class<? extends CompressionCodec> codecClass = ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
            this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)conf);
            this.codec.getDecompressorType();
        } else {
            this.codec = null;
        }
        this.ifileReadAhead = conf.getBoolean("tez.runtime.ifile.readahead", true);
        this.ifileReadAheadLength = this.ifileReadAhead ? conf.getInt("tez.runtime.ifile.readahead.bytes", 0x400000) : 0;
        Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, (TaskContext)inputContext);
        LocalFileSystem localFS = FileSystem.getLocal((Configuration)this.conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        this.localHostname = inputContext.getExecutionContext().getHostName();
        ByteBuffer shuffleMetadata = inputContext.getServiceProviderMetaData("mapreduce_shuffle");
        this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
        TezCounter shuffledInputsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SHUFFLED_INPUTS);
        TezCounter reduceShuffleBytes = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES);
        TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
        TezCounter failedShuffleCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
        TezCounter spilledRecordsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter reduceCombineInputCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.COMBINE_INPUT_RECORDS);
        TezCounter mergedMapOutputsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.MERGED_MAP_OUTPUTS);
        TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_DISK);
        TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
        TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_MEM);
        LOG.info(this.srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: " + (this.codec == null ? "None" : this.codec.getClass().getName()) + ", ifileReadAhead: " + this.ifileReadAhead);
        this.startTime = System.currentTimeMillis();
        this.scheduler = new ShuffleScheduler(this.inputContext, this.conf, numInputs, this, shuffledInputsCounter, reduceShuffleBytes, reduceDataSizeDecompressed, failedShuffleCounter, bytesShuffedToDisk, bytesShuffedToDiskDirect, bytesShuffedToMem, this.startTime, this.srcNameTrimmed);
        this.mergePhaseTime = inputContext.getCounters().findCounter((Enum)TaskCounter.MERGE_PHASE_TIME);
        this.shufflePhaseTime = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_PHASE_TIME);
        this.merger = new MergeManager(this.conf, (FileSystem)localFS, localDirAllocator, inputContext, combiner, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, this, initialMemoryAvailable, this.codec, this.ifileReadAhead, this.ifileReadAheadLength);
        this.eventHandler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, this.scheduler);
        ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + this.srcNameTrimmed + "}").build());
        int configuredNumFetchers = conf.getInt("tez.runtime.shuffle.parallel.copies", 20);
        this.numFetchers = Math.min(configuredNumFetchers, numInputs);
        LOG.info(this.srcNameTrimmed + ": " + "Num fetchers being started: " + this.numFetchers);
        this.fetchers = Lists.newArrayListWithCapacity((int)this.numFetchers);
        this.localDiskFetchEnabled = conf.getBoolean("tez.runtime.optimize.local.fetch", true);
        this.executor = MoreExecutors.listeningDecorator((ExecutorService)rawExecutor);
        this.runShuffleCallable = new RunShuffleCallable();
    }

    public void handleEvents(List<Event> events) throws IOException {
        if (!this.isShutDown.get()) {
            this.eventHandler.handleEvents(events);
        } else {
            LOG.info(this.srcNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size());
        }
    }

    public boolean isInputReady() throws IOException, InterruptedException, TezException {
        if (this.isShutDown.get()) {
            throw new InputAlreadyClosedException();
        }
        if (this.throwable.get() != null) {
            this.handleThrowable(this.throwable.get());
        }
        if (this.runShuffleFuture == null) {
            return false;
        }
        return this.runShuffleFuture.isDone();
    }

    private void handleThrowable(Throwable t) throws IOException, InterruptedException {
        if (t instanceof IOException) {
            throw (IOException)t;
        }
        if (t instanceof InterruptedException) {
            throw (InterruptedException)t;
        }
        throw new UndeclaredThrowableException(t);
    }

    public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException, TezException {
        Preconditions.checkState((this.runShuffleFuture != null ? 1 : 0) != 0, (Object)"waitForInput can only be called after run");
        TezRawKeyValueIterator kvIter = null;
        try {
            kvIter = (TezRawKeyValueIterator)this.runShuffleFuture.get();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            this.handleThrowable(cause);
        }
        if (this.isShutDown.get()) {
            throw new InputAlreadyClosedException();
        }
        if (this.throwable.get() != null) {
            this.handleThrowable(this.throwable.get());
        }
        return kvIter;
    }

    public void run() throws IOException {
        this.merger.configureAndStart();
        this.runShuffleFuture = this.executor.submit((Callable)((Object)this.runShuffleCallable));
        Futures.addCallback(this.runShuffleFuture, (FutureCallback)new ShuffleRunnerFutureCallback());
        this.executor.shutdown();
    }

    public void shutdown() {
        if (!this.isShutDown.getAndSet(true)) {
            LOG.info("Shutting down Shuffle for source: " + this.srcNameTrimmed);
            this.runShuffleFuture.cancel(true);
            this.cleanupIgnoreErrors();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
        InterruptedException ie = null;
        if (!this.fetchersClosed.getAndSet(true)) {
            List<FetcherOrderedGrouped> list = this.fetchers;
            synchronized (list) {
                for (FetcherOrderedGrouped fetcher : this.fetchers) {
                    try {
                        fetcher.shutDown();
                        LOG.info(this.srcNameTrimmed + ": " + "Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", " + "isInterrupted:" + fetcher.isInterrupted());
                    }
                    catch (InterruptedException e) {
                        if (ignoreErrors) {
                            LOG.info(this.srcNameTrimmed + ": " + "Interrupted while shutting down fetchers. Ignoring.");
                            continue;
                        }
                        if (ie != null) {
                            ie = e;
                            continue;
                        }
                        LOG.warn(this.srcNameTrimmed + ": " + "Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown " + e);
                    }
                }
                this.fetchers.clear();
            }
            if (ie != null) {
                throw ie;
            }
        }
    }

    private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
        if (!this.schedulerClosed.getAndSet(true)) {
            try {
                this.scheduler.close();
            }
            catch (InterruptedException e) {
                if (ignoreErrors) {
                    LOG.info(this.srcNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring");
                }
                throw e;
            }
        }
    }

    private void cleanupMerger(boolean ignoreErrors) throws Throwable {
        if (!this.mergerClosed.getAndSet(true)) {
            try {
                this.merger.close();
            }
            catch (Throwable e) {
                if (ignoreErrors) {
                    LOG.info(this.srcNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e);
                }
                throw e;
            }
        }
    }

    private void cleanupIgnoreErrors() {
        try {
            if (this.eventHandler != null) {
                this.eventHandler.logProgress(true);
            }
            this.cleanupFetchers(true);
            this.cleanupShuffleScheduler(true);
            this.cleanupMerger(true);
        }
        catch (Throwable t) {
            LOG.info(this.srcNameTrimmed + ": " + "Error in cleaning up.., ", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @InterfaceAudience.Private
    public synchronized void reportException(Throwable t) {
        if (this.throwable.get() == null) {
            LOG.info(this.srcNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() + "] from thread [" + Thread.currentThread().getName());
            this.throwable.set(t);
            this.throwingThreadName = Thread.currentThread().getName();
            ShuffleScheduler shuffleScheduler = this.scheduler;
            synchronized (shuffleScheduler) {
                this.scheduler.notifyAll();
            }
        }
    }

    @InterfaceAudience.Private
    public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
        return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
    }

    private class ShuffleRunnerFutureCallback
    implements FutureCallback<TezRawKeyValueIterator> {
        private ShuffleRunnerFutureCallback() {
        }

        public void onSuccess(TezRawKeyValueIterator result) {
            LOG.info(Shuffle.this.srcNameTrimmed + ": " + "Shuffle Runner thread complete");
        }

        public void onFailure(Throwable t) {
            if (Shuffle.this.isShutDown.get()) {
                LOG.info(Shuffle.this.srcNameTrimmed + ": " + "Already shutdown. Ignoring error");
            } else {
                LOG.error(Shuffle.this.srcNameTrimmed + ": " + "ShuffleRunner failed with error", t);
                Shuffle.this.inputContext.fatalError(t, "Shuffle Runner Failed");
                Shuffle.this.cleanupIgnoreErrors();
            }
        }
    }

    public static class ShuffleError
    extends IOException {
        private static final long serialVersionUID = 5753909320586607881L;

        ShuffleError(String msg, Throwable t) {
            super(msg, t);
        }
    }

    private class RunShuffleCallable
    extends CallableWithNdc<TezRawKeyValueIterator> {
        private RunShuffleCallable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException {
            Object object;
            Object object2 = this;
            synchronized (object2) {
                object = Shuffle.this.fetchers;
                synchronized (object) {
                    for (int i = 0; i < Shuffle.this.numFetchers; ++i) {
                        if (Shuffle.this.isShutDown.get()) continue;
                        FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(Shuffle.this.httpConnectionParams, Shuffle.this.scheduler, Shuffle.this.merger, Shuffle.this.metrics, Shuffle.this, Shuffle.this.jobTokenSecretMgr, Shuffle.this.ifileReadAhead, Shuffle.this.ifileReadAheadLength, Shuffle.this.codec, Shuffle.this.inputContext, Shuffle.this.conf, Shuffle.this.localDiskFetchEnabled, Shuffle.this.localHostname, Shuffle.this.shufflePort);
                        Shuffle.this.fetchers.add(fetcher);
                        fetcher.start();
                    }
                }
            }
            while (!Shuffle.this.scheduler.waitUntilDone(2000)) {
                object2 = Shuffle.this;
                synchronized (object2) {
                    if (Shuffle.this.throwable.get() != null) {
                        throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, (Throwable)Shuffle.this.throwable.get());
                    }
                }
            }
            Shuffle.this.shufflePhaseTime.setValue(System.currentTimeMillis() - Shuffle.this.startTime);
            Shuffle.this.cleanupFetchers(false);
            Shuffle.this.cleanupShuffleScheduler(false);
            TezRawKeyValueIterator kvIter = null;
            Shuffle.this.inputContext.notifyProgress();
            try {
                kvIter = Shuffle.this.merger.close();
            }
            catch (Throwable e) {
                throw new ShuffleError("Error while doing final merge ", e);
            }
            Shuffle.this.mergePhaseTime.setValue(System.currentTimeMillis() - Shuffle.this.startTime);
            Shuffle.this.inputContext.notifyProgress();
            object = Shuffle.this;
            synchronized (object) {
                if (Shuffle.this.throwable.get() != null) {
                    throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, (Throwable)Shuffle.this.throwable.get());
                }
            }
            Shuffle.this.inputContext.inputIsReady();
            LOG.info("merge complete for input vertex : " + Shuffle.this.srcNameTrimmed);
            return kvIter;
        }
    }
}

