/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sortpartition;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.sortpartition.FixedLengthByteKeyAndValueComparator;
import org.apache.flink.streaming.api.operators.sortpartition.KeyAndValueSerializer;
import org.apache.flink.streaming.api.operators.sortpartition.VariableLengthByteKeyAndValueComparator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.MutableObjectIterator;

@Internal
public class KeyedSortPartitionOperator<INPUT, KEY>
extends AbstractStreamOperator<INPUT>
implements OneInputStreamOperator<INPUT, INPUT>,
BoundedOneInput {
    protected final TypeInformation<INPUT> inputType;
    protected final KeySelector<INPUT, ?> sortFieldSelector;
    private final Order sortOrder;
    private final String stringSortField;
    private final int positionSortField;
    private PushSorter<Tuple2<byte[], INPUT>> recordSorter = null;
    private PushSorter<Tuple2<byte[], Tuple2<?, INPUT>>> recordSorterForSelector = null;
    private TypeSerializer<KEY> recordKeySerializer;
    private DataOutputSerializer dataOutputSerializer;

    public KeyedSortPartitionOperator(TypeInformation<INPUT> inputType, int positionSortField, Order sortOrder) {
        this.inputType = inputType;
        this.ensureFieldSortable(positionSortField);
        this.positionSortField = positionSortField;
        this.stringSortField = null;
        this.sortFieldSelector = null;
        this.sortOrder = sortOrder;
    }

    public KeyedSortPartitionOperator(TypeInformation<INPUT> inputType, String stringSortField, Order sortOrder) {
        this.inputType = inputType;
        this.ensureFieldSortable(stringSortField);
        this.positionSortField = -1;
        this.stringSortField = stringSortField;
        this.sortFieldSelector = null;
        this.sortOrder = sortOrder;
    }

    public <K> KeyedSortPartitionOperator(TypeInformation<INPUT> inputType, KeySelector<INPUT, K> sortFieldSelector, Order sortOrder) {
        this.inputType = inputType;
        this.ensureFieldSortable(sortFieldSelector);
        this.positionSortField = -1;
        this.stringSortField = null;
        this.sortFieldSelector = sortFieldSelector;
        this.sortOrder = sortOrder;
    }

    @Override
    protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<INPUT>> output) {
        super.setup(containingTask, config, output);
        ClassLoader userCodeClassLoader = containingTask.getUserCodeClassLoader();
        ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig();
        this.recordKeySerializer = config.getStateKeySerializer(userCodeClassLoader);
        int keyLength = this.recordKeySerializer.getLength();
        this.createDataOutputSerializer(keyLength);
        if (this.sortFieldSelector != null) {
            TypeInformation valueType = Types.TUPLE(TypeExtractor.getKeySelectorTypes(this.sortFieldSelector, this.inputType), this.inputType);
            KeyAndValueSerializer valueSerializer = new KeyAndValueSerializer(valueType.createSerializer(this.getExecutionConfig().getSerializerConfig()), keyLength);
            TypeComparator sortTypeComparator = keyLength > 0 ? new FixedLengthByteKeyAndValueComparator(keyLength, ((CompositeType)valueType).createComparator(this.getSortFieldIndex(), this.getSortOrderIndicator(), 0, executionConfig)) : new VariableLengthByteKeyAndValueComparator(((CompositeType)valueType).createComparator(this.getSortFieldIndex(), this.getSortOrderIndicator(), 0, executionConfig));
            this.recordSorterForSelector = this.getSorter(valueSerializer, sortTypeComparator, containingTask);
        } else {
            KeyAndValueSerializer<INPUT> valueSerializer = new KeyAndValueSerializer<INPUT>(this.inputType.createSerializer(this.getExecutionConfig().getSerializerConfig()), keyLength);
            TypeComparator sortTypeComparator = keyLength > 0 ? new FixedLengthByteKeyAndValueComparator(keyLength, ((CompositeType)this.inputType).createComparator(this.getSortFieldIndex(), this.getSortOrderIndicator(), 0, executionConfig)) : new VariableLengthByteKeyAndValueComparator(((CompositeType)this.inputType).createComparator(this.getSortFieldIndex(), this.getSortOrderIndicator(), 0, executionConfig));
            this.recordSorter = this.getSorter(valueSerializer, sortTypeComparator, containingTask);
        }
    }

    @Override
    public void processElement(StreamRecord<INPUT> element) throws Exception {
        Object currentKey = this.getCurrentKey();
        this.recordKeySerializer.serialize(currentKey, this.dataOutputSerializer);
        byte[] serializedKey = this.dataOutputSerializer.getCopyOfBuffer();
        this.dataOutputSerializer.clear();
        if (this.sortFieldSelector != null) {
            this.recordSorterForSelector.writeRecord(Tuple2.of(serializedKey, Tuple2.of(this.sortFieldSelector.getKey(element.getValue()), element.getValue())));
        } else {
            this.recordSorter.writeRecord(Tuple2.of(serializedKey, element.getValue()));
        }
    }

    @Override
    public void endInput() throws Exception {
        TimestampedCollector outputCollector = new TimestampedCollector(this.output);
        if (this.sortFieldSelector != null) {
            this.recordSorterForSelector.finishReading();
            MutableObjectIterator iterator = this.recordSorterForSelector.getIterator();
            Tuple2 record = (Tuple2)iterator.next();
            while (record != null) {
                outputCollector.collect(((Tuple2)record.f1).f1);
                record = (Tuple2)iterator.next();
            }
            this.recordSorterForSelector.close();
        } else {
            this.recordSorter.finishReading();
            MutableObjectIterator iterator = this.recordSorter.getIterator();
            Tuple2 record = (Tuple2)iterator.next();
            while (record != null) {
                outputCollector.collect(record.f1);
                record = (Tuple2)iterator.next();
            }
            this.recordSorter.close();
        }
    }

    @Override
    public OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).setInternalSorterSupported(true).build();
    }

    private int[] getSortFieldIndex() {
        int[] sortFieldIndex = new int[1];
        if (this.positionSortField != -1) {
            sortFieldIndex[0] = new Keys.ExpressionKeys<INPUT>(this.positionSortField, this.inputType).computeLogicalKeyPositions()[0];
        } else if (this.stringSortField != null) {
            sortFieldIndex[0] = new Keys.ExpressionKeys<INPUT>(this.stringSortField, this.inputType).computeLogicalKeyPositions()[0];
        }
        return sortFieldIndex;
    }

    private boolean[] getSortOrderIndicator() {
        boolean[] sortOrderIndicator = new boolean[]{this.sortOrder == Order.ASCENDING};
        return sortOrderIndicator;
    }

    private void ensureFieldSortable(int field) throws InvalidProgramException {
        if (!Keys.ExpressionKeys.isSortKey(field, this.inputType)) {
            throw new InvalidProgramException("The field " + field + " of input type " + String.valueOf(this.inputType) + " is not sortable.");
        }
    }

    private void ensureFieldSortable(String field) throws InvalidProgramException {
        if (!Keys.ExpressionKeys.isSortKey(field, this.inputType)) {
            throw new InvalidProgramException("The field " + field + " of input type " + String.valueOf(this.inputType) + " is not sortable.");
        }
    }

    private <K> void ensureFieldSortable(KeySelector<INPUT, K> keySelector) {
        TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.inputType);
        Keys.SelectorFunctionKeys<INPUT, K> sortKey = new Keys.SelectorFunctionKeys<INPUT, K>(keySelector, this.inputType, keyType);
        if (!sortKey.getKeyType().isSortKeyType()) {
            throw new InvalidProgramException("The key type " + String.valueOf(keyType) + " is not sortable.");
        }
    }

    private void createDataOutputSerializer(int keyLength) {
        this.dataOutputSerializer = keyLength > 0 ? new DataOutputSerializer(keyLength) : new DataOutputSerializer(64);
    }

    private <TYPE> PushSorter<TYPE> getSorter(TypeSerializer<TYPE> typeSerializer, TypeComparator<TYPE> typeComparator, StreamTask<?, ?> streamTask) {
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        Configuration jobConfiguration = streamTask.getEnvironment().getJobConfiguration();
        double managedMemoryFraction = this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, streamTask.getEnvironment().getJobConfiguration(), streamTask.getEnvironment().getTaskConfiguration(), userCodeClassLoader);
        try {
            return ExternalSorter.newBuilder(streamTask.getEnvironment().getMemoryManager(), streamTask, typeSerializer, typeComparator, streamTask.getExecutionConfig()).memoryFraction(managedMemoryFraction).enableSpilling(streamTask.getEnvironment().getIOManager(), jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD).floatValue()).maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN)).objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled()).largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER)).build();
        }
        catch (MemoryAllocationException e) {
            throw new RuntimeException(e);
        }
    }
}

