/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.io.NonSyncDataOutputStream;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.utils.LocalProgress;
import org.apache.tez.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelinedSorter
extends ExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedSorter.class);
    public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
    private static final int APPROX_HEADER_LENGTH = 150;
    private final int partitionBits;
    private static final int PARTITION = 0;
    private static final int KEYSTART = 1;
    private static final int VALSTART = 2;
    private static final int VALLEN = 3;
    private static final int NMETA = 4;
    private static final int METASIZE = 16;
    private final int minSpillsForCombine;
    private final ProxyComparator hasher;
    private SortSpan span;
    private final long capacity;
    private int bufferOverflowRecursion;
    private final SpanMerger merger;
    private final ExecutorService sortmaster;
    private final ArrayList<TezSpillRecord> indexCacheList = new ArrayList();
    private final boolean pipelinedShuffle;
    private long currentAllocatableMemory;
    @VisibleForTesting
    final List<ByteBuffer> buffers;
    @VisibleForTesting
    List<Integer> bufferUsage;
    final int maxNumberOfBlocks;
    private int bufferIndex = -1;
    private final int MIN_BLOCK_SIZE;
    private final boolean lazyAllocateMem = this.conf.getBoolean("tez.runtime.pipelined.sorter.lazy-allocate.memory", false);
    private final Deflater deflater;
    private final String auxiliaryService;
    private final List<Event> finalEvents;

    public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException {
        super(outputContext, conf, numOutputs, initialMemoryAvailable);
        if (this.lazyAllocateMem) {
            this.MIN_BLOCK_SIZE = 0xFFFFFC0;
        } else {
            int minBlockSize = conf.getInt("tez.runtime.pipelined.sorter.min-block.size.in.mb", 2000);
            Preconditions.checkArgument((minBlockSize > 0 && minBlockSize < 2047 ? 1 : 0) != 0, (Object)("tez.runtime.pipelined.sorter.min-block.size.in.mb=" + minBlockSize + " should be a positive value between 0 and 2047"));
            this.MIN_BLOCK_SIZE = minBlockSize << 20;
        }
        StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ").append(outputContext.getInputOutputVertexNames()).append(": ");
        this.partitionBits = this.bitcount(this.partitions) + 1;
        boolean confPipelinedShuffle = this.conf.getBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        this.pipelinedShuffle = !this.isFinalMergeEnabled() && confPipelinedShuffle;
        this.auxiliaryService = conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
        long sortmb = this.availableMemoryMb;
        long maxMemLimit = sortmb << 20;
        initialSetupLogLine.append(", UsingHashComparator=");
        if (this.comparator instanceof ProxyComparator) {
            this.hasher = (ProxyComparator)this.comparator;
            initialSetupLogLine.append(true);
        } else {
            this.hasher = null;
            initialSetupLogLine.append(false);
        }
        LOG.info(initialSetupLogLine.toString());
        long totalCapacityWithoutMeta = 0L;
        long availableMem = maxMemLimit;
        int numBlocks = 0;
        while (availableMem > 0L) {
            long size = Math.min(availableMem, (long)this.computeBlockSize(availableMem, maxMemLimit));
            int sizeWithoutMeta = (int)(size - size % 16L);
            totalCapacityWithoutMeta += (long)sizeWithoutMeta;
            availableMem -= size;
            ++numBlocks;
        }
        this.currentAllocatableMemory = maxMemLimit;
        this.maxNumberOfBlocks = numBlocks;
        this.capacity = totalCapacityWithoutMeta;
        this.buffers = Lists.newArrayListWithCapacity((int)this.maxNumberOfBlocks);
        this.bufferUsage = Lists.newArrayListWithCapacity((int)this.maxNumberOfBlocks);
        this.allocateSpace();
        if (!this.lazyAllocateMem) {
            LOG.info("Pre allocating rest of memory buffers upfront");
            while (this.allocateSpace() != null) {
            }
        }
        initialSetupLogLine.append("#blocks=").append(this.maxNumberOfBlocks);
        initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit);
        initialSetupLogLine.append(", lazyAllocateMem=").append(this.lazyAllocateMem);
        initialSetupLogLine.append(", minBlockSize=").append(this.MIN_BLOCK_SIZE);
        initialSetupLogLine.append(", initial BLOCK_SIZE=").append(this.buffers.get(0).capacity());
        initialSetupLogLine.append(", finalMergeEnabled=").append(this.isFinalMergeEnabled());
        initialSetupLogLine.append(", pipelinedShuffle=").append(this.pipelinedShuffle);
        initialSetupLogLine.append(", sendEmptyPartitions=").append(this.sendEmptyPartitionDetails);
        initialSetupLogLine.append(", ").append("tez.runtime.io.sort.mb").append("=").append(sortmb);
        Preconditions.checkState((this.buffers.size() > 0 ? 1 : 0) != 0, (Object)"Atleast one buffer needs to be present");
        LOG.info(initialSetupLogLine.toString());
        this.span = new SortSpan(this.buffers.get(this.bufferIndex), 0x100000, 16, this.comparator);
        this.merger = new SpanMerger();
        int sortThreads = this.conf.getInt("tez.runtime.pipelined.sorter.sort.threads", 2);
        this.sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName((String)outputContext.getTaskVertexName()) + " -> " + TezUtilsInternal.cleanVertexName((String)outputContext.getDestinationVertexName()) + "} #%d").build());
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
        this.minSpillsForCombine = this.conf.getInt("tez.runtime.combine.min.spills", 3);
        this.deflater = TezCommonUtils.newBestCompressionDeflater();
        this.finalEvents = Lists.newLinkedList();
    }

    ByteBuffer allocateSpace() {
        if (this.currentAllocatableMemory <= 0L) {
            return null;
        }
        int size = this.computeBlockSize(this.currentAllocatableMemory, this.availableMemoryMb << 20);
        this.currentAllocatableMemory -= (long)size;
        int sizeWithoutMeta = size - size % 16;
        ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta);
        this.buffers.add(space);
        ++this.bufferIndex;
        this.bufferUsage.add(0);
        Preconditions.checkState((this.buffers.size() <= this.maxNumberOfBlocks ? 1 : 0) != 0, (Object)("Number of blocks " + this.buffers.size() + " is exceeding  " + this.maxNumberOfBlocks));
        LOG.info("Newly allocated block size=" + size + ", index=" + this.bufferIndex + ", Number of buffers=" + this.buffers.size() + ", currentAllocatableMemory=" + this.currentAllocatableMemory + ", currentBufferSize=" + space.capacity() + ", total=" + (this.availableMemoryMb << 20));
        return space;
    }

    @VisibleForTesting
    int computeBlockSize(long availableMem, long maxAllocatedMemory) {
        int maxMem;
        int maxBlockSize = 0;
        if (this.lazyAllocateMem && (this.buffers == null || this.buffers.isEmpty())) {
            return 33554368;
        }
        if (availableMem < (long)(maxBlockSize = Math.max(this.MIN_BLOCK_SIZE, maxBlockSize))) {
            maxBlockSize = (int)availableMem;
        }
        int n = maxMem = maxAllocatedMemory > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)maxAllocatedMemory;
        if (maxBlockSize > maxMem) {
            maxBlockSize = maxMem;
        }
        if ((availableMem -= (long)maxBlockSize) < (long)this.MIN_BLOCK_SIZE && (long)maxBlockSize + availableMem < Integer.MAX_VALUE) {
            maxBlockSize = (int)((long)maxBlockSize + availableMem);
        }
        return maxBlockSize;
    }

    private int bitcount(int n) {
        int bit = 0;
        while (n != 0) {
            ++bit;
            n >>= 1;
        }
        return bit;
    }

    public void sort() throws IOException {
        SortSpan newSpan = this.span.next();
        if (newSpan == null) {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            this.merger.add(this.span.sort(this.sorter));
            boolean ret = this.spill(true);
            stopWatch.stop();
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.outputContext.getInputOutputVertexNames() + ": Time taken for spill " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms");
            }
            if (this.pipelinedShuffle && ret) {
                this.sendPipelinedShuffleEvents();
            }
            this.bufferIndex = (this.bufferIndex + 1) % this.buffers.size();
            this.bufferUsage.set(this.bufferIndex, this.bufferUsage.get(this.bufferIndex) + 1);
            int items = 0x100000;
            int perItem = 16;
            if (this.span.length() != 0) {
                items = this.span.length();
                perItem = this.span.kvbuffer.limit() / items;
                items = this.span.capacity / (16 + perItem);
                if (items > 0x100000) {
                    items = 0x100000;
                }
            }
            Preconditions.checkArgument((this.buffers.get(this.bufferIndex) != null ? 1 : 0) != 0, (Object)"block should not be empty");
            this.span = new SortSpan((ByteBuffer)this.buffers.get(this.bufferIndex).clear(), 0x100000, perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf));
        } else {
            SortTask task = new SortTask(this.span, this.sorter);
            LOG.debug("Submitting span={} for sort", (Object)this.span.toString());
            Future<SpanIterator> future = this.sortmaster.submit(task);
            this.merger.add(future);
            this.span = newSpan;
        }
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
    }

    private void sendPipelinedShuffleEvents() throws IOException {
        LinkedList events = Lists.newLinkedList();
        String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + (this.numSpills - 1);
        ShuffleUtils.generateEventOnSpill(events, this.isFinalMergeEnabled(), false, this.outputContext, this.numSpills - 1, this.indexCacheList.get(this.numSpills - 1), this.partitions, this.sendEmptyPartitionDetails, pathComponent, this.partitionStats, this.reportDetailedPartitionStats(), this.auxiliaryService, this.deflater);
        this.outputContext.sendEvents((List)events);
        LOG.info(this.outputContext.getInputOutputVertexNames() + ": Added spill event for spill (final update=false), spillId=" + (this.numSpills - 1));
    }

    @Override
    public void write(Object key, Object value) throws IOException {
        this.collect(key, value, this.partitioner.getPartition(key, value, this.partitions));
    }

    synchronized void collect(Object key, Object value, int partition) throws IOException {
        if (key.getClass() != this.serializationContext.getKeyClass()) {
            throw new IOException("Type mismatch in key from map: expected " + this.serializationContext.getKeyClass().getName() + ", received " + key.getClass().getName());
        }
        if (value.getClass() != this.serializationContext.getValueClass()) {
            throw new IOException("Type mismatch in value from map: expected " + this.serializationContext.getValueClass().getName() + ", received " + value.getClass().getName());
        }
        if (partition < 0 || partition >= this.partitions) {
            throw new IOException("Illegal partition for " + key + " (" + partition + ")");
        }
        if (this.span.kvmeta.remaining() < 16) {
            this.sort();
            if (this.span.length() == 0) {
                this.spillSingleRecord(key, value, partition);
                return;
            }
        }
        int keystart = this.span.kvbuffer.position();
        int valstart = -1;
        int valend = -1;
        try {
            this.keySerializer.serialize(key);
            valstart = this.span.kvbuffer.position();
            this.valSerializer.serialize(value);
            valend = this.span.kvbuffer.position();
        }
        catch (BufferOverflowException overflow) {
            this.span.kvbuffer.position(keystart);
            this.sort();
            if (this.span.length() == 0 || this.bufferOverflowRecursion > this.buffers.size()) {
                this.spillSingleRecord(key, value, partition);
                this.bufferOverflowRecursion = 0;
                return;
            }
            ++this.bufferOverflowRecursion;
            this.collect(key, value, partition);
            return;
        }
        if (this.bufferOverflowRecursion > 0) {
            --this.bufferOverflowRecursion;
        }
        int prefix = 0;
        if (this.hasher != null) {
            prefix = this.hasher.getProxy(key);
        }
        prefix = partition << 32 - this.partitionBits | prefix >>> this.partitionBits;
        this.span.kvmeta.put(prefix);
        this.span.kvmeta.put(keystart);
        this.span.kvmeta.put(valstart);
        this.span.kvmeta.put(valend - valstart);
        this.mapOutputRecordCounter.increment(1L);
        this.outputContext.notifyProgress();
        this.mapOutputByteCounter.increment((long)(valend - keystart));
    }

    private void adjustSpillCounters(long rawLength, long compLength) {
        if (!this.isFinalMergeEnabled()) {
            this.outputBytesWithOverheadCounter.increment(rawLength);
        } else if (this.numSpills > 0) {
            this.additionalSpillBytesWritten.increment(compLength);
            this.outputBytesWithOverheadCounter.setValue(0L);
        } else {
            this.outputBytesWithOverheadCounter.increment(rawLength);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void spillSingleRecord(Object key, Object value, int partition) throws IOException {
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, -1L);
        Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
        this.spillFilePaths.put(this.numSpills, filename);
        TezSpillRecord.ensureSpillFilePermissions(filename, this.rfs);
        try (FSDataOutputStream out = this.rfs.create(filename, true, 4096);){
            LOG.info(this.outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() + ", indexFilename=" + indexFilename);
            for (int i = 0; i < this.partitions; ++i) {
                if (this.isThreadInterrupted()) {
                    return;
                }
                IFile.Writer writer = null;
                try {
                    long segmentStart = out.getPos();
                    if (!this.sendEmptyPartitionDetails || i == partition) {
                        writer = new IFile.Writer(this.serializationContext.getKeySerialization(), this.serializationContext.getValSerialization(), out, this.serializationContext.getKeyClass(), this.serializationContext.getValueClass(), this.codec, this.spilledRecordsCounter, null, false);
                    }
                    if (i == partition) {
                        long recordStart = out.getPos();
                        writer.append(key, value);
                        this.mapOutputRecordCounter.increment(1L);
                        this.mapOutputByteCounter.increment(out.getPos() - recordStart);
                    }
                    long rawLength = 0L;
                    long partLength = 0L;
                    if (writer != null) {
                        writer.close();
                        rawLength = writer.getRawLength();
                        partLength = writer.getCompressedLength();
                    }
                    this.adjustSpillCounters(rawLength, partLength);
                    TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
                    spillRec.putIndex(rec, i);
                    writer = null;
                    continue;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
            }
            this.spillFileIndexPaths.put(this.numSpills, indexFilename);
            spillRec.writeToFile(indexFilename, this.conf, (FileSystem)this.localFs);
            this.indexCacheList.add(spillRec);
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus(filename).getLen());
                this.numShuffleChunks.setValue((long)this.numSpills);
            }
            if (this.pipelinedShuffle) {
                this.sendPipelinedShuffleEvents();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean spill(boolean ignoreEmptySpills) throws IOException {
        Path filename;
        TezSpillRecord spillRec;
        FSDataOutputStream out = null;
        try {
            boolean ret = this.merger.ready();
            if (!ret && ignoreEmptySpills) {
                boolean bl = false;
                return bl;
            }
            long size = this.capacity + (long)(this.partitions * 150);
            spillRec = new TezSpillRecord(this.partitions);
            filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, size);
            this.spillFilePaths.put(this.numSpills, filename);
            out = this.rfs.create(filename, true, 4096);
            TezSpillRecord.ensureSpillFilePermissions(filename, this.rfs);
            LOG.info(this.outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.info(this.outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete");
            throw new IOInterruptedException(this.outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e);
        }
        for (int i = 0; i < this.partitions; ++i) {
            if (this.isThreadInterrupted()) {
                return false;
            }
            this.outputContext.notifyProgress();
            TezRawKeyValueIterator kvIter = this.merger.filter(i);
            long segmentStart = out.getPos();
            IFile.Writer writer = null;
            boolean hasNext = kvIter.hasNext();
            if (hasNext || !this.sendEmptyPartitionDetails) {
                writer = new IFile.Writer(this.serializationContext.getKeySerialization(), this.serializationContext.getValSerialization(), out, this.serializationContext.getKeyClass(), this.serializationContext.getValueClass(), this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
            }
            if (this.combiner == null) {
                while (kvIter.next()) {
                    writer.append(kvIter.getKey(), kvIter.getValue());
                }
            } else if (hasNext) {
                this.runCombineProcessor(kvIter, writer);
            }
            long rawLength = 0L;
            long partLength = 0L;
            if (writer != null) {
                writer.close();
                rawLength = writer.getRawLength();
                partLength = writer.getCompressedLength();
            }
            this.adjustSpillCounters(rawLength, partLength);
            TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
            spillRec.putIndex(rec, i);
            if (this.isFinalMergeEnabled() || !this.reportPartitionStats()) continue;
            int n = i;
            this.partitionStats[n] = this.partitionStats[n] + rawLength;
        }
        Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
        this.spillFileIndexPaths.put(this.numSpills, indexFilename);
        spillRec.writeToFile(indexFilename, this.conf, (FileSystem)this.localFs);
        this.indexCacheList.add(spillRec);
        ++this.numSpills;
        if (this.isFinalMergeEnabled()) return true;
        this.fileOutputByteCounter.increment(this.rfs.getFileStatus(filename).getLen());
        this.numShuffleChunks.setValue((long)this.numSpills);
        return true;
        finally {
            if (out != null) {
                out.close();
            }
        }
    }

    private boolean isThreadInterrupted() throws IOException {
        if (Thread.currentThread().isInterrupted()) {
            if (this.cleanup) {
                this.cleanup();
            }
            this.sortmaster.shutdownNow();
            LOG.info(this.outputContext.getInputOutputVertexNames() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + this.sortmaster.isShutdown() + ", terminated=" + this.sortmaster.isTerminated());
            return true;
        }
        return false;
    }

    @Override
    public void flush() throws IOException {
        String uniqueIdentifier = this.outputContext.getUniqueIdentifier();
        this.outputContext.notifyProgress();
        if (this.isThreadInterrupted()) {
            return;
        }
        try {
            LOG.info(this.outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
            this.span.end();
            this.merger.add(this.span.sort(this.sorter));
            this.spill(false);
            this.sortmaster.shutdown();
            this.buffers.clear();
            if (this.indexCacheList.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.outputContext.getInputOutputVertexNames() + ": Index list is empty... returning");
                }
                return;
            }
            if (!this.isFinalMergeEnabled()) {
                int startIndex = this.pipelinedShuffle ? this.numSpills - 1 : 0;
                int endIndex = this.numSpills;
                for (int i = startIndex; i < endIndex; ++i) {
                    boolean isLastEvent = i == this.numSpills - 1;
                    String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + i;
                    ShuffleUtils.generateEventOnSpill(this.finalEvents, this.isFinalMergeEnabled(), isLastEvent, this.outputContext, i, this.indexCacheList.get(i), this.partitions, this.sendEmptyPartitionDetails, pathComponent, this.partitionStats, this.reportDetailedPartitionStats(), this.auxiliaryService, this.deflater);
                    LOG.info(this.outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
                }
                return;
            }
            this.numAdditionalSpills.increment((long)(this.numSpills - 1));
            if (this.numSpills == 1) {
                Path filename = (Path)this.spillFilePaths.get(0);
                Path indexFilename = (Path)this.spillFileIndexPaths.get(0);
                this.finalOutputFile = this.mapOutputFile.getOutputFileForWriteInVolume(filename);
                this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
                this.sameVolRename(filename, this.finalOutputFile);
                this.sameVolRename(indexFilename, this.finalIndexFile);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.outputContext.getInputOutputVertexNames() + ": numSpills=" + this.numSpills + ", finalOutputFile=" + this.finalOutputFile + ", finalIndexFile=" + this.finalIndexFile + ", filename=" + filename + ", indexFilename=" + indexFilename);
                }
                TezSpillRecord spillRecord = new TezSpillRecord(this.finalIndexFile, (FileSystem)this.localFs);
                if (this.reportPartitionStats()) {
                    for (int i = 0; i < spillRecord.size(); ++i) {
                        int n = i;
                        this.partitionStats[n] = this.partitionStats[n] + spillRecord.getIndex(i).getRawLength();
                    }
                }
                this.numShuffleChunks.setValue((long)this.numSpills);
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
                return;
            }
            this.finalOutputFile = this.mapOutputFile.getOutputFileForWrite(0L);
            this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWrite(0L);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.outputContext.getInputOutputVertexNames() + ": numSpills: " + this.numSpills + ", finalOutputFile:" + this.finalOutputFile + ", finalIndexFile:" + this.finalIndexFile);
            }
            FSDataOutputStream finalOut = this.rfs.create(this.finalOutputFile, true, 4096);
            TezSpillRecord.ensureSpillFilePermissions(this.finalOutputFile, this.rfs);
            TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
            for (int parts = 0; parts < this.partitions; ++parts) {
                boolean shouldWrite = false;
                ArrayList<TezMerger.Segment> segmentList = new ArrayList<TezMerger.Segment>(this.numSpills);
                for (int i = 0; i < this.numSpills; ++i) {
                    Path spillFilename = (Path)this.spillFilePaths.get(i);
                    TezIndexRecord indexRecord = this.indexCacheList.get(i).getIndex(parts);
                    if (!indexRecord.hasData() && this.sendEmptyPartitionDetails) continue;
                    shouldWrite = true;
                    TezMerger.DiskSegment s = new TezMerger.DiskSegment(this.rfs, spillFilename, indexRecord.getStartOffset(), indexRecord.getPartLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, true);
                    segmentList.add(s);
                }
                int mergeFactor = this.conf.getInt("tez.runtime.io.sort.factor", 100);
                boolean sortSegments = segmentList.size() > mergeFactor;
                TezRawKeyValueIterator kvIter = TezMerger.merge(this.conf, this.rfs, this.serializationContext, this.codec, segmentList, mergeFactor, new Path(uniqueIdentifier), ConfigUtils.getIntermediateOutputKeyComparator(this.conf), this.progressable, sortSegments, true, null, this.spilledRecordsCounter, this.additionalSpillBytesRead, null, this.merger.needsRLE());
                long segmentStart = finalOut.getPos();
                long rawLength = 0L;
                long partLength = 0L;
                if (shouldWrite) {
                    IFile.Writer writer = new IFile.Writer(this.serializationContext.getKeySerialization(), this.serializationContext.getValSerialization(), finalOut, this.serializationContext.getKeyClass(), this.serializationContext.getValueClass(), this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
                    if (this.combiner == null || this.numSpills < this.minSpillsForCombine) {
                        TezMerger.writeFile(kvIter, writer, this.progressable, 10000L);
                    } else {
                        this.runCombineProcessor(kvIter, writer);
                    }
                    writer.close();
                    rawLength = writer.getRawLength();
                    partLength = writer.getCompressedLength();
                }
                this.outputBytesWithOverheadCounter.increment(rawLength);
                TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
                spillRec.putIndex(rec, parts);
                if (!this.reportPartitionStats()) continue;
                int n = parts;
                this.partitionStats[n] = this.partitionStats[n] + rawLength;
            }
            this.numShuffleChunks.setValue(1L);
            this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
            spillRec.writeToFile(this.finalIndexFile, this.conf, (FileSystem)this.localFs);
            finalOut.close();
            for (int i = 0; i < this.numSpills; ++i) {
                Path indexFilename = (Path)this.spillFileIndexPaths.get(i);
                Path spillFilename = (Path)this.spillFilePaths.get(i);
                this.rfs.delete(indexFilename, true);
                this.rfs.delete(spillFilename, true);
            }
            this.spillFileIndexPaths.clear();
            this.spillFilePaths.clear();
        }
        catch (InterruptedException ie) {
            if (this.cleanup) {
                this.cleanup();
            }
            Thread.currentThread().interrupt();
            throw new IOInterruptedException("Interrupted while closing Output", ie);
        }
    }

    @Override
    public final List<Event> close() throws IOException {
        super.close();
        return this.finalEvents;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public boolean needsRLE() {
        return this.merger.needsRLE();
    }

    private final class SpanMerger
    implements PartitionedRawKeyValueIterator {
        InputByteBuffer key = new InputByteBuffer();
        InputByteBuffer value = new InputByteBuffer();
        int partition;
        private ArrayList<Future<SpanIterator>> futures = new ArrayList();
        private SpanHeap heap = new SpanHeap();
        private PartitionFilter partIter;
        private int gallop = 0;
        private SpanIterator horse;
        private long total = 0L;
        private long eq = 0L;

        public SpanMerger() {
            this.partIter = new PartitionFilter(this);
        }

        public final void add(SpanIterator iter) {
            if (iter.next()) {
                this.heap.add(iter);
            }
        }

        public final void add(Future<SpanIterator> iter) {
            this.futures.add(iter);
        }

        public final boolean ready() throws IOException, InterruptedException {
            int numSpanItr = this.futures.size();
            try {
                SpanIterator iter = null;
                while (this.futures.size() > 0) {
                    Future<SpanIterator> futureIter = this.futures.remove(0);
                    iter = futureIter.get();
                    this.add(iter);
                }
                StringBuilder sb = new StringBuilder();
                if (this.heap.size() == 0) {
                    return false;
                }
                for (SpanIterator sp : this.heap) {
                    sb.append(sp.toString());
                    sb.append(",");
                    this.total += (long)sp.span.length();
                    this.eq += sp.span.getEq();
                }
                LOG.info(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": Heap = " + sb.toString());
                return true;
            }
            catch (ExecutionException e) {
                LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={}, futures.size={}, destVertexName={}", new Object[]{this.heap.size(), this.total, this.eq, this.partition, this.gallop, numSpanItr, this.futures.size(), PipelinedSorter.this.outputContext.getDestinationVertexName(), e});
                throw new IOException(e);
            }
        }

        private SpanIterator pop() {
            if (this.gallop > 0) {
                --this.gallop;
                return this.horse;
            }
            SpanIterator current = this.heap.pop();
            SpanIterator next = (SpanIterator)this.heap.peek();
            if (next != null && current != null && this.horse == current) {
                this.gallop = current.bisect(next.getKey(), next.getPartition()) - 1;
            }
            this.horse = current;
            return current;
        }

        public boolean needsRLE() {
            return (double)this.eq > 0.1 * (double)this.total;
        }

        private SpanIterator peek() {
            if (this.gallop > 0) {
                return this.horse;
            }
            return (SpanIterator)this.heap.peek();
        }

        @Override
        public final boolean next() {
            SpanIterator current = this.pop();
            if (current != null) {
                this.partition = current.getPartition();
                this.key.reset(current.getKey());
                this.value.reset(current.getValue());
                if (this.gallop <= 0) {
                    this.add(current);
                } else {
                    current.next();
                }
                return true;
            }
            return false;
        }

        @Override
        public boolean hasNext() {
            return this.peek() != null;
        }

        @Override
        public Integer peekPartition() {
            if (!this.hasNext()) {
                return null;
            }
            SpanIterator peek = this.peek();
            return peek.getPartition();
        }

        @Override
        public DataInputBuffer getKey() {
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() {
            return this.value;
        }

        @Override
        public int getPartition() {
            return this.partition;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        @Override
        public boolean isSameKey() throws IOException {
            return false;
        }

        public TezRawKeyValueIterator filter(int partition) {
            this.partIter.reset(partition);
            return this.partIter;
        }
    }

    private static class SpanHeap
    extends PriorityQueue<SpanIterator> {
        private static final long serialVersionUID = 1L;

        public SpanHeap() {
            super(256);
        }

        public SpanIterator pop() {
            return (SpanIterator)this.poll();
        }
    }

    private class PartitionFilter
    implements TezRawKeyValueIterator {
        private final PartitionedRawKeyValueIterator iter;
        private int partition;
        private boolean dirty = false;

        public PartitionFilter(PartitionedRawKeyValueIterator iter) {
            this.iter = iter;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            return this.iter.getKey();
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            return this.iter.getValue();
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        @Override
        public boolean isSameKey() throws IOException {
            return this.iter.isSameKey();
        }

        @Override
        public boolean next() throws IOException {
            if (this.dirty || this.iter.next()) {
                int prefix = this.iter.getPartition();
                if (prefix >>> 32 - PipelinedSorter.this.partitionBits == this.partition) {
                    this.dirty = false;
                    return true;
                }
                if (!this.dirty) {
                    this.dirty = true;
                }
            }
            return false;
        }

        @Override
        public boolean hasNext() throws IOException {
            Integer part;
            if ((this.dirty || this.iter.hasNext()) && (part = this.dirty ? Integer.valueOf(this.iter.getPartition()) : this.iter.peekPartition()) != null) {
                return part >>> 32 - PipelinedSorter.this.partitionBits == this.partition;
            }
            return false;
        }

        public void reset(int partition) {
            this.partition = partition;
        }

        public int getPartition() {
            return this.partition;
        }
    }

    private static class SortTask
    extends CallableWithNdc<SpanIterator> {
        private final SortSpan sortable;
        private final IndexedSorter sorter;

        public SortTask(SortSpan sortable, IndexedSorter sorter) {
            this.sortable = sortable;
            this.sorter = sorter;
        }

        protected SpanIterator callInternal() {
            return this.sortable.sort(this.sorter);
        }
    }

    private static class SpanIterator
    implements PartitionedRawKeyValueIterator,
    Comparable<SpanIterator> {
        private int kvindex = -1;
        private final int maxindex;
        private final IntBuffer kvmeta;
        private final ByteBuffer kvbuffer;
        private final SortSpan span;
        private final InputByteBuffer key = new InputByteBuffer();
        private final InputByteBuffer value = new InputByteBuffer();
        private final Progress progress = new LocalProgress();
        private static final int minrun = 16;

        public SpanIterator(SortSpan span) {
            this.kvmeta = span.kvmeta;
            this.kvbuffer = span.kvbuffer;
            this.span = span;
            this.maxindex = this.kvmeta.limit() / 4 - 1;
        }

        @Override
        public DataInputBuffer getKey() {
            int keystart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 1);
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            byte[] buf = this.kvbuffer.array();
            int off = this.kvbuffer.arrayOffset();
            this.key.reset(buf, off + keystart, valstart - keystart);
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() {
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            int vallen = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 3);
            byte[] buf = this.kvbuffer.array();
            int off = this.kvbuffer.arrayOffset();
            this.value.reset(buf, off + valstart, vallen);
            return this.value;
        }

        @Override
        public boolean next() {
            if (this.kvindex == this.maxindex) {
                return false;
            }
            ++this.kvindex;
            if (this.kvindex % 100 == 0) {
                this.progress.set(1.0f - (float)(this.maxindex - this.kvindex) / (float)this.maxindex);
            }
            return true;
        }

        @Override
        public boolean hasNext() {
            return this.kvindex == this.maxindex;
        }

        @Override
        public void close() {
        }

        @Override
        public Progress getProgress() {
            return this.progress;
        }

        @Override
        public boolean isSameKey() throws IOException {
            return false;
        }

        @Override
        public int getPartition() {
            int partition = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 0);
            return partition;
        }

        @Override
        public Integer peekPartition() {
            if (!this.hasNext()) {
                return null;
            }
            return this.kvmeta.get(this.span.offsetFor(this.kvindex + 1) + 0);
        }

        public int size() {
            return this.maxindex - this.kvindex;
        }

        @Override
        public int compareTo(SpanIterator other) {
            return this.span.compareInternal(other.getKey(), other.getPartition(), this.kvindex);
        }

        public String toString() {
            return String.format("SpanIterator<%d:%d> (span=%s)", this.kvindex, this.maxindex, this.span.toString());
        }

        int bisect(DataInputBuffer needle, int needlePart) {
            int start = this.kvindex;
            int end = this.maxindex - 1;
            int mid = start;
            int cmp = 0;
            if (end - start < 16) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, start) > 0) {
                return this.kvindex;
            }
            if (this.span.compareInternal(needle, needlePart, start + 16) > 0) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, end) < 0) {
                return end - this.kvindex;
            }
            boolean found = false;
            for (int i = 0; start < end && i < 16; ++i) {
                mid = start + (end - start) / 2;
                cmp = this.span.compareInternal(needle, needlePart, mid);
                if (cmp == 0) {
                    start = mid;
                    found = true;
                } else if (cmp < 0) {
                    start = mid;
                    found = true;
                }
                if (cmp <= 0) continue;
                end = mid;
            }
            if (found) {
                return start - this.kvindex;
            }
            return 0;
        }
    }

    private final class SortSpan
    implements IndexedSortable {
        final IntBuffer kvmeta;
        final byte[] rawkvmeta;
        final int kvmetabase;
        final ByteBuffer kvbuffer;
        final NonSyncDataOutputStream out;
        final RawComparator comparator;
        final byte[] imeta = new byte[16];
        private int index = 0;
        private long eq = 0L;
        private boolean reinit = false;
        private int capacity;

        public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) {
            this.capacity = source.remaining();
            int metasize = 16 * maxItems;
            int dataSize = maxItems * perItem;
            if (this.capacity < metasize + dataSize) {
                metasize = 16 * (this.capacity / (perItem + 16));
            }
            ByteBuffer reserved = source.duplicate();
            reserved.mark();
            LOG.info(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": reserved.remaining()=" + reserved.remaining() + ", reserved.metasize=" + metasize);
            reserved.position(metasize);
            this.kvbuffer = reserved.slice();
            reserved.flip();
            reserved.limit(metasize);
            ByteBuffer kvmetabuffer = reserved.slice();
            this.rawkvmeta = kvmetabuffer.array();
            this.kvmetabase = kvmetabuffer.arrayOffset();
            this.kvmeta = kvmetabuffer.order(ByteOrder.nativeOrder()).asIntBuffer();
            this.out = new NonSyncDataOutputStream((OutputStream)new BufferStreamWrapper(this.kvbuffer));
            this.comparator = comparator;
        }

        public SpanIterator sort(IndexedSorter sorter) {
            long start = System.currentTimeMillis();
            if (this.length() > 1) {
                sorter.sort((IndexedSortable)this, 0, this.length(), PipelinedSorter.this.progressable);
            }
            LOG.info(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": done sorting span=" + this.index + ", length=" + this.length() + ", time=" + (System.currentTimeMillis() - start));
            return new SpanIterator(this);
        }

        int offsetFor(int i) {
            return i * 4;
        }

        public void swap(int mi, int mj) {
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            int kvioff = this.kvmetabase + (kvi << 2);
            int kvjoff = this.kvmetabase + (kvj << 2);
            System.arraycopy(this.rawkvmeta, kvioff, this.imeta, 0, 16);
            System.arraycopy(this.rawkvmeta, kvjoff, this.rawkvmeta, kvioff, 16);
            System.arraycopy(this.imeta, 0, this.rawkvmeta, kvjoff, 16);
        }

        protected int compareKeys(int kvi, int kvj) {
            int off;
            int istart = this.kvmeta.get(kvi + 1);
            int jstart = this.kvmeta.get(kvj + 1);
            int ilen = this.kvmeta.get(kvi + 2) - istart;
            int jlen = this.kvmeta.get(kvj + 2) - jstart;
            if (ilen == 0 || jlen == 0) {
                if (ilen == jlen) {
                    ++this.eq;
                }
                return ilen - jlen;
            }
            byte[] buf = this.kvbuffer.array();
            int cmp = this.comparator.compare(buf, (off = this.kvbuffer.arrayOffset()) + istart, ilen, buf, off + jstart, jlen);
            if (cmp == 0) {
                ++this.eq;
            }
            return cmp;
        }

        public int compare(int mi, int mj) {
            int kvjp;
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            int kvip = this.kvmeta.get(kvi + 0);
            if (kvip != (kvjp = this.kvmeta.get(kvj + 0))) {
                return kvip - kvjp;
            }
            return this.compareKeys(kvi, kvj);
        }

        public SortSpan next() {
            ByteBuffer remaining = this.end();
            if (remaining != null) {
                RawComparator newComparator;
                SortSpan newSpan = null;
                int items = this.length();
                int perItem = this.kvbuffer.position() / items;
                if (this.reinit) {
                    items = 0x100000;
                    perItem = 16;
                }
                if (this.comparator == (newComparator = ConfigUtils.getIntermediateOutputKeyComparator(PipelinedSorter.this.conf))) {
                    LOG.warn("Same comparator used. comparator={}, newComparator={}, hashCode: comparator={}, newComparator={}", new Object[]{this.comparator, newComparator, System.identityHashCode(this.comparator), System.identityHashCode(newComparator)});
                }
                newSpan = new SortSpan(remaining, items, perItem, newComparator);
                newSpan.index = this.index + 1;
                LOG.info(String.format(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": New Span%d.length = %d, perItem = %d", newSpan.index, newSpan.length(), perItem) + ", counter:" + PipelinedSorter.this.mapOutputRecordCounter.getValue());
                return newSpan;
            }
            return null;
        }

        public int length() {
            return this.kvmeta.limit() / 4;
        }

        public ByteBuffer end() {
            ByteBuffer remaining = this.kvbuffer.duplicate();
            remaining.position(this.kvbuffer.position());
            remaining = remaining.slice();
            this.kvbuffer.limit(this.kvbuffer.position());
            this.kvmeta.limit(this.kvmeta.position());
            int items = this.length();
            if (items == 0) {
                return null;
            }
            int perItem = this.kvbuffer.position() / items;
            LOG.info(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": " + String.format("Span%d.length = %d, perItem = %d", this.index, this.length(), perItem));
            if (remaining.remaining() < 16 + perItem) {
                ByteBuffer space = PipelinedSorter.this.allocateSpace();
                if (space != null) {
                    LOG.info(PipelinedSorter.this.outputContext.getInputOutputVertexNames() + ": Getting memory from next block in the list, recordsWritten=" + PipelinedSorter.this.mapOutputRecordCounter.getValue());
                    this.reinit = true;
                    return space;
                }
                return null;
            }
            return remaining;
        }

        public int compareInternal(DataInputBuffer needle, int needlePart, int index) {
            int cmp = 0;
            int partition = this.kvmeta.get(this.offsetFor(index) + 0);
            if (partition != needlePart) {
                cmp = partition - needlePart;
            } else {
                int keystart = this.kvmeta.get(this.offsetFor(index) + 1);
                int valstart = this.kvmeta.get(this.offsetFor(index) + 2);
                byte[] buf = this.kvbuffer.array();
                int off = this.kvbuffer.arrayOffset();
                cmp = this.comparator.compare(buf, keystart + off, valstart - keystart, needle.getData(), needle.getPosition(), needle.getLength() - needle.getPosition());
            }
            return cmp;
        }

        public long getEq() {
            return this.eq;
        }

        public String toString() {
            return String.format("Span[%d,%d]", 4 * this.kvmeta.capacity(), this.kvbuffer.limit());
        }
    }

    private static final class InputByteBuffer
    extends DataInputBuffer {
        private byte[] buffer = new byte[256];
        private ByteBuffer wrapped = ByteBuffer.wrap(this.buffer);

        private InputByteBuffer() {
        }

        private void resize(int length) {
            if (length > this.buffer.length || this.buffer.length > 10 * (1 + length)) {
                this.buffer = new byte[length];
                this.wrapped = ByteBuffer.wrap(this.buffer);
            }
            this.wrapped.limit(length);
        }

        public void reset(DataInputBuffer clone) {
            byte[] data = clone.getData();
            int start = clone.getPosition();
            int length = clone.getLength() - start;
            super.reset(data, start, length);
        }

        public void copy(DataInputBuffer clone) {
            byte[] data = clone.getData();
            int start = clone.getPosition();
            int length = clone.getLength() - start;
            this.resize(length);
            System.arraycopy(data, start, this.buffer, 0, length);
            super.reset(this.buffer, 0, length);
        }
    }

    private static class BufferStreamWrapper
    extends OutputStream {
        private final ByteBuffer out;

        public BufferStreamWrapper(ByteBuffer out) {
            this.out = out;
        }

        @Override
        public void write(int b) throws IOException {
            this.out.put((byte)b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.out.put(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.out.put(b, off, len);
        }
    }

    private static interface PartitionedRawKeyValueIterator
    extends TezRawKeyValueIterator {
        public int getPartition();

        public Integer peekPartition();
    }
}

