package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/SortingThread.class */
class SortingThread<E> extends ThreadBase<E> {
    private static final Logger LOG = LoggerFactory.getLogger(SortingThread.class);
    private final IndexedSorter sorter;

    public SortingThread(@Nullable ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> stageMessageDispatcher) {
        super(exceptionHandler, "SortMerger sorting thread", stageMessageDispatcher);
        this.sorter = new QuickSort();
    }

    @Override // org.apache.flink.runtime.operators.sort.ThreadBase
    public void go() throws InterruptedException {
        boolean z = true;
        while (isRunning() && z) {
            CircularElement<E> take = this.dispatcher.take(StageRunner.SortStage.SORT);
            if (take == CircularElement.EOF_MARKER || take == CircularElement.SPILLING_MARKER) {
                if (take == CircularElement.EOF_MARKER) {
                    LOG.debug("Sorting thread done.");
                    z = false;
                }
            } else if (take.getBuffer().size() == 0) {
                take.getBuffer().reset();
                this.dispatcher.send(StageRunner.SortStage.READ, take);
            } else {
                LOG.debug("Sorting buffer {}.", Integer.valueOf(take.getId()));
                this.sorter.sort(take.getBuffer());
                LOG.debug("Sorted buffer {}.", Integer.valueOf(take.getId()));
            }
            this.dispatcher.send(StageRunner.SortStage.SPILL, take);
        }
    }
}
