/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.persistence;

import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.MapUtils;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.persistence.PersistenceServiceManager;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.TaskPersistenceService;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
import tech.powerjob.worker.persistence.fs.impl.ExternalTaskFileSystemPersistenceService;
import tech.powerjob.worker.pojo.model.InstanceInfo;

public class SwapTaskPersistenceService
implements TaskPersistenceService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SwapTaskPersistenceService.class);
    private final Long instanceId;
    private final long maxActiveTaskNum;
    private final long scheduleRateMs;
    private final LongAdder dbRecordNum = new LongAdder();
    private final LongAdder externalPendingRecordNum = new LongAdder();
    private final LongAdder externalSucceedRecordNum = new LongAdder();
    private final LongAdder externalFailedRecordNum = new LongAdder();
    private final boolean needResult;
    private final boolean canUseSwap;
    private final TaskPersistenceService dbTaskPersistenceService;
    private boolean swapEnabled;
    private volatile boolean finished = false;
    private ExternalTaskPersistenceService externalTaskPersistenceService;
    private long lastExternalPendingEmptyTime = -1L;
    private static final long MAX_EXTERNAL_PENDING_WAIT_TIME = 600000L;
    private static final long DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM = 100000L;
    private static final long DEFAULT_SCHEDULE_TIME = 60000L;

    public SwapTaskPersistenceService(InstanceInfo instanceInfo, TaskPersistenceService dbTaskPersistenceService) {
        this.instanceId = instanceInfo.getInstanceId();
        this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
        this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
        this.dbTaskPersistenceService = dbTaskPersistenceService;
        this.maxActiveTaskNum = Long.parseLong(System.getProperty("powerjob.worker.swap.max-active-task-num", String.valueOf(100000L)));
        this.scheduleRateMs = Long.parseLong(System.getProperty("powerjob.worker.swap.scan-interval", String.valueOf(60000L)));
        PersistenceServiceManager.register(this.instanceId, this);
        log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}, scheduleRateMs: {}", new Object[]{this.instanceId, this.canUseSwap, this.needResult, this.maxActiveTaskNum, this.scheduleRateMs});
    }

    @Override
    public void init() throws Exception {
    }

    @Override
    public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
        return this.dbTaskPersistenceService.updateTask(instanceId, taskId, updateEntity);
    }

    @Override
    public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
        return this.dbTaskPersistenceService.updateTaskStatus(instanceId, taskId, status, lastReportTime, result);
    }

    @Override
    public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
        return this.dbTaskPersistenceService.updateLostTasks(instanceId, addressList, retry);
    }

    @Override
    public Optional<TaskDO> getLastTask(Long instanceId, Long subInstanceId) {
        return this.dbTaskPersistenceService.getLastTask(instanceId, subInstanceId);
    }

    @Override
    public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
        return this.dbTaskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, address);
    }

    @Override
    public List<TaskDO> getTaskByStatus(Long instanceId, TaskStatus status, int limit) {
        return this.dbTaskPersistenceService.getTaskByStatus(instanceId, status, limit);
    }

    @Override
    public List<TaskDO> getTaskByQuery(Long instanceId, String customQuery) {
        return this.dbTaskPersistenceService.getTaskByQuery(instanceId, customQuery);
    }

    @Override
    public Optional<TaskDO> getTask(Long instanceId, String taskId) {
        return this.dbTaskPersistenceService.getTask(instanceId, taskId);
    }

    @Override
    public boolean deleteAllSubInstanceTasks(Long instanceId, Long subInstanceId) {
        return this.dbTaskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId);
    }

    @Override
    public boolean deleteTasksByTaskIds(Long instanceId, Collection<String> taskId) {
        return this.dbTaskPersistenceService.deleteTasksByTaskIds(instanceId, taskId);
    }

    @Override
    public boolean batchSave(List<TaskDO> tasks) {
        long dbNum = this.dbRecordNum.sum();
        if (this.canUseSwap && dbNum > this.maxActiveTaskNum) {
            boolean persistPendingTaskRes = this.getExternalTaskPersistenceService().persistPendingTask(tasks);
            if (persistPendingTaskRes) {
                this.externalPendingRecordNum.add(tasks.size());
            }
            log.debug("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", new Object[]{this.instanceId, dbNum, persistPendingTaskRes, this.externalPendingRecordNum});
            return persistPendingTaskRes;
        }
        return this.persistTask2Db(tasks);
    }

    @Override
    public boolean deleteAllTasks(Long instanceId) {
        this.finished = true;
        CommonUtils.executeIgnoreException(() -> {
            if (this.swapEnabled) {
                this.externalTaskPersistenceService.close();
            }
        });
        PersistenceServiceManager.unregister(instanceId);
        return this.dbTaskPersistenceService.deleteAllTasks(instanceId);
    }

    @Override
    public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
        Map<TaskStatus, Long> taskStatusStatistics = this.dbTaskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
        if (!this.swapEnabled) {
            return taskStatusStatistics;
        }
        long waitingNum = MapUtils.getLongValue(taskStatusStatistics, (Object)((Object)TaskStatus.WAITING_DISPATCH)) + this.externalPendingRecordNum.sum();
        long succeedNum = MapUtils.getLongValue(taskStatusStatistics, (Object)((Object)TaskStatus.WORKER_PROCESS_SUCCESS)) + this.externalSucceedRecordNum.sum();
        long failedNum = MapUtils.getLongValue(taskStatusStatistics, (Object)((Object)TaskStatus.WORKER_PROCESS_FAILED)) + this.externalFailedRecordNum.sum();
        taskStatusStatistics.put(TaskStatus.WAITING_DISPATCH, waitingNum);
        taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_SUCCESS, succeedNum);
        taskStatusStatistics.put(TaskStatus.WORKER_PROCESS_FAILED, failedNum);
        return taskStatusStatistics;
    }

    @Override
    public List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) {
        List<TaskDO> externalTask;
        List<TaskResult> dbTaskResult = this.dbTaskPersistenceService.getAllTaskResult(instanceId, subInstanceId);
        if (!this.swapEnabled) {
            return dbTaskResult;
        }
        LinkedList allTaskResult = Lists.newLinkedList(dbTaskResult);
        while (!CollectionUtils.isEmpty(externalTask = this.externalTaskPersistenceService.readFinishedTask())) {
            externalTask.forEach(t -> {
                TaskResult taskResult = new TaskResult();
                taskResult.setTaskId(t.getTaskId());
                taskResult.setSuccess(TaskStatus.WORKER_PROCESS_SUCCESS.getValue() == t.getStatus().intValue());
                taskResult.setResult(t.getResult());
                allTaskResult.add(taskResult);
            });
        }
        return allTaskResult;
    }

    private boolean persistTask2Db(List<TaskDO> taskDOS) {
        this.dbRecordNum.add(taskDOS.size());
        return this.dbTaskPersistenceService.batchSave(taskDOS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ExternalTaskPersistenceService getExternalTaskPersistenceService() {
        if (this.externalTaskPersistenceService != null) {
            return this.externalTaskPersistenceService;
        }
        SwapTaskPersistenceService swapTaskPersistenceService = this;
        synchronized (swapTaskPersistenceService) {
            if (this.externalTaskPersistenceService != null) {
                return this.externalTaskPersistenceService;
            }
            this.swapEnabled = true;
            this.externalTaskPersistenceService = new ExternalTaskFileSystemPersistenceService(this.instanceId, this.needResult);
            new Thread((Runnable)((Object)new YuGong()), "PJ-YuGong-" + this.instanceId).start();
            return this.externalTaskPersistenceService;
        }
    }

    private class YuGong
    extends SafeRunnable {
        private YuGong() {
        }

        protected void run0() {
            while (!SwapTaskPersistenceService.this.finished) {
                CommonUtils.easySleep((long)SwapTaskPersistenceService.this.scheduleRateMs);
                this.moveOutFinishedTask();
                this.moveInPendingTask();
            }
            return;
        }

        private void moveInPendingTask() {
            List<TaskDO> taskDOS;
            boolean persistTask2Db;
            do {
                if (SwapTaskPersistenceService.this.externalPendingRecordNum.sum() <= 0L) {
                    SwapTaskPersistenceService.this.lastExternalPendingEmptyTime = -1L;
                    if (SwapTaskPersistenceService.this.externalPendingRecordNum.sum() < 0L) {
                        log.warn("[SwapTaskPersistenceService-{}] externalPendingRecordNum({}) < 0, maybe there's a bug!", (Object)SwapTaskPersistenceService.this.instanceId, (Object)SwapTaskPersistenceService.this.externalPendingRecordNum);
                    }
                    return;
                }
                if (SwapTaskPersistenceService.this.dbRecordNum.sum() > SwapTaskPersistenceService.this.maxActiveTaskNum) {
                    SwapTaskPersistenceService.this.lastExternalPendingEmptyTime = -1L;
                    return;
                }
                taskDOS = SwapTaskPersistenceService.this.getExternalTaskPersistenceService().readPendingTask();
                if (CollectionUtils.isEmpty(taskDOS)) {
                    long offset;
                    if (SwapTaskPersistenceService.this.lastExternalPendingEmptyTime < 0L) {
                        SwapTaskPersistenceService.this.lastExternalPendingEmptyTime = System.currentTimeMillis();
                    }
                    if ((offset = System.currentTimeMillis() - SwapTaskPersistenceService.this.lastExternalPendingEmptyTime) > 600000L) {
                        log.warn("[SwapTaskPersistenceService-{}] [moveInPendingTask] Unable to get tasks from external files for a long time, unexpected things may have happened(lastExternalPendingEmptyTime: {}, offsetFromNow: {}). System will reset externalPendingRecordNum so that the task can end(before reset externalPendingRecordNum: {}).", new Object[]{SwapTaskPersistenceService.this.instanceId, SwapTaskPersistenceService.this.lastExternalPendingEmptyTime, offset, SwapTaskPersistenceService.this.externalPendingRecordNum});
                        SwapTaskPersistenceService.this.externalPendingRecordNum.reset();
                        return;
                    }
                    log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask from external is empty, finished this loop!", (Object)SwapTaskPersistenceService.this.instanceId);
                    return;
                }
                SwapTaskPersistenceService.this.lastExternalPendingEmptyTime = -1L;
                SwapTaskPersistenceService.this.externalPendingRecordNum.add(-taskDOS.size());
                persistTask2Db = SwapTaskPersistenceService.this.persistTask2Db(taskDOS);
                log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", new Object[]{SwapTaskPersistenceService.this.instanceId, taskDOS.size(), persistTask2Db, SwapTaskPersistenceService.this.dbRecordNum, SwapTaskPersistenceService.this.externalPendingRecordNum});
            } while (persistTask2Db);
            log.error("[SwapTaskPersistenceService-{}] [moveInPendingTask] moveIn task failed, these tasks are lost: {}", (Object)SwapTaskPersistenceService.this.instanceId, taskDOS);
        }

        private void moveOutFinishedTask() {
            while (true) {
                long maxRemainNum = SwapTaskPersistenceService.this.maxActiveTaskNum / 2L;
                if (SwapTaskPersistenceService.this.dbRecordNum.sum() <= maxRemainNum) {
                    return;
                }
                List<TaskDO> succeedTasks = SwapTaskPersistenceService.this.dbTaskPersistenceService.getTaskByStatus(SwapTaskPersistenceService.this.instanceId, TaskStatus.WORKER_PROCESS_SUCCESS, 100);
                if (!CollectionUtils.isEmpty(succeedTasks)) {
                    this.moveOutDetailFinishedTask(succeedTasks, true);
                    continue;
                }
                List<TaskDO> failedTask = SwapTaskPersistenceService.this.dbTaskPersistenceService.getTaskByStatus(SwapTaskPersistenceService.this.instanceId, TaskStatus.WORKER_PROCESS_FAILED, 100);
                if (CollectionUtils.isEmpty(failedTask)) {
                    return;
                }
                this.moveOutDetailFinishedTask(failedTask, false);
            }
        }

        private void moveOutDetailFinishedTask(List<TaskDO> tasks, boolean success) {
            String logKey = String.format("[SwapTaskPersistenceService-%d] [moveOut%sTask] ", SwapTaskPersistenceService.this.instanceId, success ? "Success" : "Failed");
            boolean persistFinishedTask2ExternalResult = SwapTaskPersistenceService.this.getExternalTaskPersistenceService().persistFinishedTask(tasks);
            if (!persistFinishedTask2ExternalResult) {
                log.warn("{} persistFinishedTask to external failed, skip this stage!", (Object)logKey);
            }
            LongAdder externalRecord = success ? SwapTaskPersistenceService.this.externalSucceedRecordNum : SwapTaskPersistenceService.this.externalFailedRecordNum;
            int moveOutNum = tasks.size();
            externalRecord.add(moveOutNum);
            List<String> deleteTaskIds = tasks.stream().map(TaskDO::getTaskId).collect(Collectors.toList());
            boolean deleteTasksByTaskIdsResult = SwapTaskPersistenceService.this.dbTaskPersistenceService.deleteTasksByTaskIds(SwapTaskPersistenceService.this.instanceId, deleteTaskIds);
            if (deleteTasksByTaskIdsResult) {
                SwapTaskPersistenceService.this.dbRecordNum.add(-moveOutNum);
                log.debug("{} move task to external successfully(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {})", new Object[]{logKey, moveOutNum, SwapTaskPersistenceService.this.externalSucceedRecordNum, SwapTaskPersistenceService.this.externalFailedRecordNum, SwapTaskPersistenceService.this.dbRecordNum});
            } else {
                log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", new Object[]{logKey, moveOutNum, SwapTaskPersistenceService.this.externalSucceedRecordNum, SwapTaskPersistenceService.this.externalFailedRecordNum, SwapTaskPersistenceService.this.dbRecordNum, deleteTaskIds});
            }
        }
    }
}

