/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.task.executor.container;

import ch.qos.logback.classic.ClassicConstants;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.Generated;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import org.apache.dolphinscheduler.task.executor.container.ITaskExecutorContainer;
import org.apache.dolphinscheduler.task.executor.container.TaskExecutorAssignmentTable;
import org.apache.dolphinscheduler.task.executor.container.TaskExecutorContainerConfig;
import org.apache.dolphinscheduler.task.executor.exceptions.TaskExecutorRuntimeException;
import org.apache.dolphinscheduler.task.executor.worker.TaskExecutorWorker;
import org.apache.dolphinscheduler.task.executor.worker.TaskExecutorWorkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTaskExecutorContainer
implements ITaskExecutorContainer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractTaskExecutorContainer.class);
    private final TaskExecutorAssignmentTable taskExecutorAssignmentTable;
    protected final ThreadPoolExecutor taskExecutorThreadPool;
    protected final TaskExecutorWorkers taskExecutorWorkers;

    public AbstractTaskExecutorContainer(TaskExecutorContainerConfig containerConfig) {
        String threadPoolFormat = containerConfig.getContainerName() + "-worker-%d";
        int threadPoolSize = containerConfig.getTaskExecutorThreadPoolSize();
        this.taskExecutorThreadPool = ThreadUtils.newDaemonFixedThreadExecutor((String)threadPoolFormat, (int)threadPoolSize);
        this.taskExecutorWorkers = new TaskExecutorWorkers(threadPoolSize);
        this.taskExecutorAssignmentTable = new TaskExecutorAssignmentTable();
        this.startAllThreadTaskExecutorWorker();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dispatch(ITaskExecutor taskExecutor) {
        AbstractTaskExecutorContainer abstractTaskExecutorContainer = this;
        synchronized (abstractTaskExecutorContainer) {
            Optional<TaskExecutorWorker> taskExecutorWorkerCandidate = this.getTaskExecutorWorkerCandidate(taskExecutor);
            if (!taskExecutorWorkerCandidate.isPresent()) {
                log.info("All ExclusiveThreadTaskExecutorWorker are busy, cannot submit taskExecutor(id={})", (Object)taskExecutor.getId());
                throw new TaskExecutorRuntimeException("All ExclusiveThreadTaskExecutorWorker are busy");
            }
            TaskExecutorWorker taskExecutorWorker = taskExecutorWorkerCandidate.get();
            taskExecutorWorker.registerTaskExecutor(taskExecutor);
            this.taskExecutorAssignmentTable.registerTaskExecutor(taskExecutor, taskExecutorWorker);
        }
    }

    @Override
    public void start(ITaskExecutor taskExecutor) {
        Integer workerId = this.taskExecutorAssignmentTable.getTaskExecutorWorkerId(taskExecutor);
        if (workerId == null) {
            throw new IllegalStateException("The taskExecutor: " + taskExecutor.getId() + " is not registered to any worker");
        }
        TaskExecutorWorker taskExecutorWorker = this.taskExecutorWorkers.getWorkerById(workerId);
        taskExecutorWorker.fireTaskExecutor(taskExecutor);
    }

    @Override
    public void pause(ITaskExecutor taskExecutor) {
        taskExecutor.pause();
    }

    @Override
    public void kill(ITaskExecutor taskExecutor) {
        taskExecutor.kill();
    }

    @Override
    public void finalize(ITaskExecutor taskExecutor) {
        if (this.taskExecutorAssignmentTable.isTaskExecutorRegistered(taskExecutor)) {
            Integer taskExecutorWorkerId = this.taskExecutorAssignmentTable.getTaskExecutorWorkerId(taskExecutor);
            this.taskExecutorWorkers.getWorkerById(taskExecutorWorkerId).unRegisterTaskExecutor(taskExecutor);
            this.taskExecutorAssignmentTable.unregisterTaskExecutor(taskExecutor);
        }
        log.info(ClassicConstants.FINALIZE_SESSION_MARKER, ClassicConstants.FINALIZE_SESSION_MARKER.toString());
        this.pushTaskExecutorLogToRemote(taskExecutor);
    }

    @Override
    public double slotUsage() {
        List<TaskExecutorWorker> allWorkers = this.taskExecutorWorkers.getWorkers();
        long activeWorkerCount = allWorkers.stream().filter(taskExecutorWorker -> taskExecutorWorker.getRegisteredTaskExecutorSize() != 0).count();
        return (double)activeWorkerCount / (double)allWorkers.size();
    }

    @VisibleForTesting
    public TaskExecutorAssignmentTable getTaskExecutorAssignmentTable() {
        return this.taskExecutorAssignmentTable;
    }

    @VisibleForTesting
    public TaskExecutorWorkers getTaskExecutorWorkers() {
        return this.taskExecutorWorkers;
    }

    private void startAllThreadTaskExecutorWorker() {
        for (TaskExecutorWorker taskExecutorWorker : this.taskExecutorWorkers.getWorkers()) {
            this.taskExecutorThreadPool.submit(taskExecutorWorker::start);
        }
    }

    protected abstract Optional<TaskExecutorWorker> getTaskExecutorWorkerCandidate(ITaskExecutor var1);

    private void pushTaskExecutorLogToRemote(ITaskExecutor taskExecutor) {
        TaskExecutionContext taskExecutionContext = taskExecutor.getTaskExecutionContext();
        try {
            if (RemoteLogUtils.isRemoteLoggingEnable()) {
                RemoteLogUtils.sendRemoteLog((String)taskExecutionContext.getLogPath());
                log.info("Send task log {} to remote storage successfully", (Object)taskExecutionContext.getLogPath());
            }
        }
        catch (Exception ex) {
            log.error("Send task log {} to remote storage failed", (Object)taskExecutionContext.getLogPath(), (Object)ex);
        }
    }
}

