/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Downstream;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SingleNode;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

public class RootNode<Row>
extends AbstractNode<Row>
implements SingleNode<Row>,
Downstream<Row>,
Iterator<Row> {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition cond = this.lock.newCondition();
    private final Consumer<Throwable> onClose;
    private final AtomicReference<Throwable> ex = new AtomicReference();
    private final Function<Row, Row> converter;
    private int waiting;
    private Deque<Row> inBuff = new ArrayDeque<Row>(IN_BUFFER_SIZE);
    private Deque<Row> outBuff = new ArrayDeque<Row>(IN_BUFFER_SIZE);
    private long idleTime;
    private long execTime;
    private long prevTs;
    private volatile boolean closed;

    public RootNode(ExecutionContext<Row> ctx, RelDataType rowType) {
        super(ctx, rowType);
        this.onClose = t -> this.closeInternal();
        this.converter = TypeUtils.resultTypeConverter(ctx, rowType);
        this.prevTs = System.nanoTime();
    }

    public RootNode(ExecutionContext<Row> ctx, RelDataType rowType, Consumer<Throwable> onClose) {
        super(ctx, rowType);
        this.onClose = onClose;
        this.converter = TypeUtils.resultTypeConverter(ctx, rowType);
        this.prevTs = System.nanoTime();
    }

    public UUID queryId() {
        return this.context().queryId();
    }

    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        this.lock.lock();
        try {
            if (this.waiting != -1) {
                this.ex.compareAndSet(null, (Throwable)new IgniteSQLException("The query was cancelled while executing.", 3014, (Throwable)new QueryCancelledException()));
            }
            this.closed = true;
            this.cond.signalAll();
        }
        finally {
            this.lock.unlock();
        }
        this.onClose.accept(this.ex.get());
    }

    @Nullable
    public Throwable failure() {
        return this.ex.get();
    }

    @Override
    protected boolean isClosed() {
        return this.closed;
    }

    @Override
    public void closeInternal() {
        this.context().execute((RunnableX & Serializable)() -> this.sources().forEach(IgniteUtils::closeQuiet), this::onError);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void push(Row row) throws Exception {
        this.lock.lock();
        try {
            assert (this.waiting > 0);
            this.checkState();
            --this.waiting;
            this.inBuff.offer(row);
            if (this.inBuff.size() == IN_BUFFER_SIZE) {
                this.cond.signalAll();
            }
            if (this.waiting == 0) {
                long curTs = System.nanoTime();
                this.execTime += curTs - this.prevTs;
                this.prevTs = curTs;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void end() throws Exception {
        assert (this.waiting > 0);
        this.lock.lock();
        try {
            this.checkState();
            this.waiting = -1;
            this.execTime += System.nanoTime() - this.prevTs;
            this.prevTs = 0L;
            this.cond.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    protected void onErrorInternal(Throwable e) {
        if (!this.ex.compareAndSet(null, e)) {
            this.ex.get().addSuppressed(e);
        }
        U.closeQuiet((AutoCloseable)this);
    }

    @Override
    public boolean hasNext() {
        this.checkException();
        if (!this.outBuff.isEmpty()) {
            return true;
        }
        if (this.closed && this.ex.get() == null) {
            return false;
        }
        this.exchangeBuffers();
        return !this.outBuff.isEmpty();
    }

    @Override
    public Row next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        return this.converter.apply(this.outBuff.remove());
    }

    @Override
    protected Downstream<Row> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    @Override
    public void onRegister(Downstream<Row> downstream) {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void rewindInternal() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void request(int rowsCnt) {
        throw new UnsupportedOperationException();
    }

    public long execTime() {
        this.lock.lock();
        try {
            long l = TimeUnit.NANOSECONDS.toMillis(this.execTime + (this.waiting > 0 ? System.nanoTime() - this.prevTs : 0L));
            return l;
        }
        finally {
            this.lock.unlock();
        }
    }

    public long idleTime() {
        this.lock.lock();
        try {
            long l = TimeUnit.NANOSECONDS.toMillis(this.idleTime + (this.waiting == 0 ? System.nanoTime() - this.prevTs : 0L));
            return l;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void exchangeBuffers() {
        assert (!F.isEmpty(this.sources()) && this.sources().size() == 1);
        this.lock.lock();
        try {
            while (this.ex.get() == null) {
                assert (this.outBuff.isEmpty());
                if (this.inBuff.size() == IN_BUFFER_SIZE || this.waiting == -1) {
                    Deque<Row> tmp = this.inBuff;
                    this.inBuff = this.outBuff;
                    this.outBuff = tmp;
                }
                if (this.waiting == -1) {
                    this.close();
                } else if (this.inBuff.isEmpty() && this.waiting == 0) {
                    int req = this.waiting = IN_BUFFER_SIZE;
                    this.context().execute((RunnableX & Serializable)() -> this.source().request(req), this::onError);
                    long curTs = System.nanoTime();
                    this.idleTime += curTs - this.prevTs;
                    this.prevTs = curTs;
                }
                if (!this.outBuff.isEmpty()) break;
                if (this.waiting == -1) {
                    break;
                }
                this.cond.await();
            }
        }
        catch (InterruptedException e) {
            throw new IgniteInterruptedException(e);
        }
        finally {
            this.lock.unlock();
        }
        this.checkException();
    }

    private void checkException() {
        Throwable e = this.ex.get();
        if (e == null) {
            return;
        }
        if (e instanceof IgniteSQLException) {
            throw (IgniteSQLException)e;
        }
        throw new IgniteSQLException("An error occurred while query executing.", 1, e);
    }
}

