/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.queryengine.execution.schedule;

import com.google.common.base.Ticker;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
import org.apache.iotdb.db.queryengine.execution.schedule.AbstractDriverThread;
import org.apache.iotdb.db.queryengine.execution.schedule.DriverTaskAbortedException;
import org.apache.iotdb.db.queryengine.execution.schedule.ExecutionContext;
import org.apache.iotdb.db.queryengine.execution.schedule.ITaskScheduler;
import org.apache.iotdb.db.queryengine.execution.schedule.ThreadProducer;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.SetThreadName;

public class DriverTaskThread
extends AbstractDriverThread {
    private static final double DRIVER_TASK_EXECUTION_TIME_SLICE_IN_MS = IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs();
    private static final Duration[] TIME_SLICE_FOR_EACH_LEVEL = (Duration[])IntStream.range(0, MultilevelPriorityQueue.getNumOfPriorityLevels()).mapToObj(level -> new Duration((double)(level + 1) * DRIVER_TASK_EXECUTION_TIME_SLICE_IN_MS, TimeUnit.MILLISECONDS)).toArray(Duration[]::new);
    private static final Executor listeningExecutor = IoTDBThreadPoolFactory.newCachedThreadPool((String)ThreadName.DRIVER_TASK_SCHEDULER_NOTIFICATION.getName());
    private final Ticker ticker = Ticker.systemTicker();

    public DriverTaskThread(String workerId, ThreadGroup tg, IndexedBlockingQueue<DriverTask> queue, ITaskScheduler scheduler, ThreadProducer producer) {
        super(workerId, tg, queue, scheduler, producer);
    }

    @Override
    public void execute(DriverTask task) throws InterruptedException {
        Duration timeSlice;
        long startNanos = this.ticker.read();
        if (!this.scheduler.readyToRunning(task)) {
            return;
        }
        IDriver driver = task.getDriver();
        ListenableFuture<?> future = driver.processFor(timeSlice = this.getExecutionTimeSliceForDriverTask(task));
        if (future.isCancelled()) {
            task.setAbortCause(new DriverTaskAbortedException(task.getDriverTaskId().getFullId(), "already being cancelled"));
            this.scheduler.toAborted(task);
            return;
        }
        long quantaScheduledNanos = this.ticker.read() - startNanos;
        ExecutionContext context = new ExecutionContext();
        context.setScheduledTimeInNanos(quantaScheduledNanos);
        context.setTimeSlice(timeSlice);
        if (driver.isFinished()) {
            this.scheduler.runningToFinished(task, context);
            return;
        }
        if (future.isDone()) {
            this.scheduler.runningToReady(task, context);
        } else {
            this.scheduler.runningToBlocked(task, context);
            future.addListener(() -> {
                try (SetThreadName driverTaskName2 = new SetThreadName(task.getDriver().getDriverTaskId().getFullId());){
                    this.scheduler.blockedToReady(task);
                }
            }, listeningExecutor);
        }
    }

    private Duration getExecutionTimeSliceForDriverTask(DriverTask driverTask) {
        if (driverTask.isHighestPriority()) {
            return TIME_SLICE_FOR_EACH_LEVEL[0];
        }
        return TIME_SLICE_FOR_EACH_LEVEL[driverTask.getPriority().getLevel()];
    }
}

