/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.RamUsageEstimator;

public class StreamingAggregationOperator
extends AbstractOperator {
    private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(StreamingAggregationOperator.class);
    private final Operator child;
    private final List<TableAggregator> aggregators;
    private final int[] groupByChannels;
    private final TsBlockBuilder resultBuilder;
    private final ColumnBuilder[] resultColumnsBuilder;
    private boolean finished = false;
    private SortKey currentGroup;
    private final Comparator<SortKey> groupKeyComparator;
    private final Deque<TsBlock> outputs = new LinkedList<TsBlock>();

    public StreamingAggregationOperator(OperatorContext operatorContext, Operator child, List<Type> groupByTypes, List<Integer> groupByChannels, Comparator<SortKey> groupKeyComparator, List<TableAggregator> aggregators, long maxPartialMemory, boolean spillEnabled, long unSpillMemoryLimit) {
        this.operatorContext = operatorContext;
        this.child = child;
        this.groupByChannels = Ints.toArray(groupByChannels);
        this.groupKeyComparator = groupKeyComparator;
        this.aggregators = aggregators;
        this.resultBuilder = new TsBlockBuilder(Stream.concat(groupByTypes.stream().map(InternalTypeManager::getTSDataType), aggregators.stream().map(TableAggregator::getType)).collect(Collectors.toList()));
        this.resultColumnsBuilder = this.resultBuilder.getValueColumnBuilders();
        Preconditions.checkArgument((!spillEnabled ? 1 : 0) != 0, (Object)"spill is not supported");
    }

    @Override
    public ListenableFuture<?> isBlocked() {
        return this.child.isBlocked();
    }

    @Override
    public boolean hasNext() throws Exception {
        return !this.finished || this.retainedTsBlock != null || !this.outputs.isEmpty();
    }

    @Override
    public TsBlock next() throws Exception {
        if (this.retainedTsBlock != null) {
            return this.getResultFromRetainedTsBlock();
        }
        if (!this.outputs.isEmpty()) {
            this.resultTsBlock = this.outputs.removeFirst();
            return this.checkTsBlockSizeAndGetResult();
        }
        if (this.child.hasNextWithTimer()) {
            TsBlock block = this.child.nextWithTimer();
            if (block == null) {
                return null;
            }
            this.processInput(block);
        } else {
            if (this.currentGroup != null) {
                this.evaluateAndFlushGroup(this.currentGroup.tsBlock, this.currentGroup.rowIndex, true);
                this.currentGroup = null;
            }
            this.finished = true;
        }
        if (this.outputs.isEmpty()) {
            return null;
        }
        this.resultTsBlock = this.outputs.removeFirst();
        return this.checkTsBlockSizeAndGetResult();
    }

    private void processInput(TsBlock page) {
        Objects.requireNonNull(page, "page is null");
        if (this.currentGroup != null) {
            if (this.groupKeyComparator.compare(this.currentGroup, new SortKey(page, 0)) != 0) {
                this.evaluateAndFlushGroup(this.currentGroup.tsBlock, this.currentGroup.rowIndex, false);
            }
            this.currentGroup = null;
        }
        int startPosition = 0;
        while (true) {
            int nextGroupStart = this.findNextGroupStart(startPosition, page);
            this.addRowsToAggregators(page, startPosition, nextGroupStart - 1);
            if (nextGroupStart >= page.getPositionCount()) break;
            this.evaluateAndFlushGroup(page, startPosition, false);
            startPosition = nextGroupStart;
        }
        this.currentGroup = new SortKey(page, page.getPositionCount() - 1);
    }

    private void addRowsToAggregators(TsBlock page, int startPosition, int endPosition) {
        TsBlock region = page.getRegion(startPosition, endPosition - startPosition + 1);
        for (TableAggregator aggregator : this.aggregators) {
            aggregator.processBlock(region);
        }
    }

    private void resetAggregationBuilder() {
        for (TableAggregator aggregator : this.aggregators) {
            aggregator.reset();
        }
    }

    private void evaluateAndFlushGroup(TsBlock page, int position, boolean lastCalculate) {
        this.resultBuilder.declarePosition();
        for (int i = 0; i < this.groupByChannels.length; ++i) {
            Column input = page.getColumn(this.groupByChannels[i]);
            if (input.isNull(position)) {
                this.resultColumnsBuilder[i].appendNull();
                continue;
            }
            this.resultColumnsBuilder[i].write(input, position);
        }
        int offset = this.groupByChannels.length;
        for (int i = 0; i < this.aggregators.size(); ++i) {
            this.aggregators.get(i).evaluate(this.resultColumnsBuilder[offset + i]);
        }
        if (lastCalculate || this.resultBuilder.isFull()) {
            this.outputs.add(this.resultBuilder.build((Column)new RunLengthEncodedColumn((Column)AbstractTableScanOperator.TIME_COLUMN_TEMPLATE, this.resultBuilder.getPositionCount())));
            this.resultBuilder.reset();
        }
        this.resetAggregationBuilder();
    }

    private int findNextGroupStart(int startPosition, TsBlock page) {
        int positionCount = page.getPositionCount();
        for (int i = startPosition + 1; i < positionCount; ++i) {
            if (this.groupKeyComparator.compare(new SortKey(page, startPosition), new SortKey(page, i)) == 0) continue;
            return i;
        }
        return positionCount;
    }

    @Override
    public boolean isFinished() throws Exception {
        return this.finished && this.retainedTsBlock == null && this.outputs.isEmpty();
    }

    @Override
    public void close() throws Exception {
        this.child.close();
        this.aggregators.forEach(TableAggregator::close);
    }

    @Override
    public OperatorContext getOperatorContext() {
        return this.operatorContext;
    }

    @Override
    public long calculateMaxPeekMemory() {
        return Math.max(this.child.calculateMaxPeekMemoryWithCounter(), this.calculateRetainedSizeAfterCallingNext() + this.calculateMaxReturnSize());
    }

    @Override
    public long calculateMaxReturnSize() {
        return this.maxReturnSize;
    }

    @Override
    public long calculateRetainedSizeAfterCallingNext() {
        return this.child.calculateMaxReturnSize() + this.child.calculateRetainedSizeAfterCallingNext();
    }

    public long ramBytesUsed() {
        return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(this.child) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(this.operatorContext) + this.outputs.stream().mapToLong(TsBlock::getRetainedSizeInBytes).sum() + this.resultBuilder.getRetainedSizeInBytes();
    }
}

