/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.task.executor.eventbus;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import org.apache.dolphinscheduler.task.executor.ITaskExecutorRepository;
import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorEventBusCoordinator;
import org.apache.dolphinscheduler.task.executor.eventbus.TaskExecutorEventBus;
import org.apache.dolphinscheduler.task.executor.events.ITaskExecutorLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFinalizeLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKillLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPauseLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent;
import org.apache.dolphinscheduler.task.executor.listener.ITaskExecutorLifecycleEventListener;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorEventBusCoordinator
implements ITaskExecutorEventBusCoordinator {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskExecutorEventBusCoordinator.class);
    private final String coordinatorName;
    private final ITaskExecutorRepository taskExecutorRepository;
    private final List<ITaskExecutorLifecycleEventListener> taskExecutorLifecycleEventListeners;
    private static final int DEFAULT_WORKER_SIZE = Runtime.getRuntime().availableProcessors();
    private static final long DEFAULT_FIRE_INTERVAL = 50L;
    private static final Set<Integer> firingTaskExecutorIds = ConcurrentHashMap.newKeySet();
    private ScheduledExecutorService mainExecutorThreadPool;
    private ThreadPoolExecutor workerExecutorThreadPool;

    public TaskExecutorEventBusCoordinator(String coordinatorName, ITaskExecutorRepository taskExecutorRepository) {
        this.coordinatorName = coordinatorName;
        this.taskExecutorRepository = taskExecutorRepository;
        this.taskExecutorLifecycleEventListeners = new ArrayList<ITaskExecutorLifecycleEventListener>();
    }

    @Override
    public void start() {
        this.mainExecutorThreadPool = ThreadUtils.newDaemonScheduledExecutorService((String)(this.coordinatorName + "-eventbus-coordinator-main-%d"), (int)1);
        this.mainExecutorThreadPool.scheduleWithFixedDelay(this::fireTaskExecutorEventBus, 0L, 50L, TimeUnit.MILLISECONDS);
        this.workerExecutorThreadPool = ThreadUtils.newDaemonFixedThreadExecutor((String)(this.coordinatorName + "-eventbus-coordinator-worker-%d"), (int)DEFAULT_WORKER_SIZE);
        log.info("{} started, worker size: {}", (Object)this.coordinatorName, (Object)DEFAULT_WORKER_SIZE);
    }

    @Override
    public void registerTaskExecutorLifecycleEventListener(ITaskExecutorLifecycleEventListener taskExecutorLifecycleEventListener) {
        Preconditions.checkNotNull((Object)taskExecutorLifecycleEventListener);
        this.taskExecutorLifecycleEventListeners.add(taskExecutorLifecycleEventListener);
    }

    @Override
    public void close() {
        this.mainExecutorThreadPool.shutdownNow();
        log.info("{} closed", (Object)this.coordinatorName);
    }

    private void fireTaskExecutorEventBus() {
        try {
            Collection<ITaskExecutor> taskExecutors = this.taskExecutorRepository.getAll();
            if (CollectionUtils.isEmpty(taskExecutors)) {
                return;
            }
            for (ITaskExecutor taskExecutor : taskExecutors) {
                if (this.isFiring(taskExecutor)) continue;
                Integer taskExecutorId = taskExecutor.getId();
                ((CompletableFuture)CompletableFuture.runAsync(() -> firingTaskExecutorIds.add(taskExecutorId), this.workerExecutorThreadPool).thenAccept(v -> this.doFireTaskExecutorEventBus(taskExecutor))).whenComplete((v, e) -> firingTaskExecutorIds.remove(taskExecutorId));
            }
        }
        catch (Throwable throwable) {
            log.error("Fire TaskExecutorEventBus error", throwable);
        }
    }

    private void doFireTaskExecutorEventBus(ITaskExecutor taskExecutor) {
        try (TaskExecutorMDCUtils.MDCAutoClosable ignored = TaskExecutorMDCUtils.logWithMDC(taskExecutor);){
            TaskExecutorEventBus taskExecutorEventBus = taskExecutor.getTaskExecutorEventBus();
            if (taskExecutorEventBus.isEmpty()) {
                return;
            }
            Optional headEventOptional = taskExecutorEventBus.poll();
            if (!headEventOptional.isPresent()) {
                return;
            }
            ITaskExecutorLifecycleEvent taskExecutorLifecycleEvent = (ITaskExecutorLifecycleEvent)headEventOptional.get();
            try {
                block29: for (ITaskExecutorLifecycleEventListener taskExecutorLifecycleEventListener : this.taskExecutorLifecycleEventListeners) {
                    switch (taskExecutorLifecycleEvent.getType()) {
                        case DISPATCHED: {
                            taskExecutorLifecycleEventListener.onTaskExecutorDispatchedLifecycleEvent((TaskExecutorDispatchedLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case RUNNING: {
                            taskExecutorLifecycleEventListener.onTaskExecutorStartedLifecycleEvent((TaskExecutorStartedLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case RUNTIME_CONTEXT_CHANGE: {
                            taskExecutorLifecycleEventListener.onTaskExecutorRuntimeContextChangedEvent((TaskExecutorRuntimeContextChangedLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case PAUSE: {
                            taskExecutorLifecycleEventListener.onTaskExecutorPauseLifecycleEvent((TaskExecutorPauseLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case PAUSED: {
                            taskExecutorLifecycleEventListener.onTaskExecutorPausedLifecycleEvent((TaskExecutorPausedLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case KILL: {
                            taskExecutorLifecycleEventListener.onTaskExecutorKillLifecycleEvent((TaskExecutorKillLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case KILLED: {
                            taskExecutorLifecycleEventListener.onTaskExecutorKilledLifecycleEvent((TaskExecutorKilledLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case SUCCESS: {
                            taskExecutorLifecycleEventListener.onTaskExecutorSuccessLifecycleEvent((TaskExecutorSuccessLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case FAILED: {
                            taskExecutorLifecycleEventListener.onTaskExecutorFailLifecycleEvent((TaskExecutorFailedLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                        case FINALIZE: {
                            taskExecutorLifecycleEventListener.onTaskExecutorFinalizeLifecycleEvent((TaskExecutorFinalizeLifecycleEvent)taskExecutorLifecycleEvent);
                            continue block29;
                        }
                    }
                    throw new IllegalArgumentException("Unsupported TaskExecutorLifecycleEvent: " + taskExecutorLifecycleEvent);
                }
                log.info("Success fire {}: {} ", (Object)taskExecutorLifecycleEvent.getClass().getSimpleName(), (Object)JSONUtils.toPrettyJsonString((Object)taskExecutorLifecycleEvent));
            }
            catch (Exception e) {
                log.error("Fire TaskExecutorLifecycleEvent: {} error", (Object)taskExecutorLifecycleEvent, (Object)e);
            }
        }
    }

    private boolean isFiring(ITaskExecutor taskExecutor) {
        return firingTaskExecutorIds.contains(taskExecutor.getId());
    }
}

