/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.service.slot;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.seatunnel.engine.common.config.server.AllocateStrategy;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
import org.apache.seatunnel.engine.server.service.slot.SlotContext;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
import org.apache.seatunnel.engine.server.service.slot.WrongTargetSlotException;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.HardwareAbstractionLayer;

public class DefaultSlotService
implements SlotService {
    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 5000L;
    private static final int SYSTEM_LOAD_SEND_INTERVAL = 2;
    private final NodeEngineImpl nodeEngine;
    private AtomicReference<ResourceProfile> unassignedResource;
    private AtomicReference<ResourceProfile> assignedResource;
    private ConcurrentMap<Integer, SlotProfile> assignedSlots;
    private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
    private ScheduledExecutorService scheduledExecutorService;
    private final SlotServiceConfig config;
    private volatile boolean initStatus;
    private final IdGenerator idGenerator;
    private final TaskExecutionService taskExecutionService;
    private ConcurrentMap<Integer, SlotContext> contexts;
    private String slotServiceSequence;

    public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, SlotServiceConfig config) {
        this.nodeEngine = nodeEngine;
        this.config = config;
        this.taskExecutionService = taskExecutionService;
        this.idGenerator = new IdGenerator();
    }

    @Override
    public void init() {
        this.initStatus = true;
        this.slotServiceSequence = UUID.randomUUID().toString();
        this.contexts = new ConcurrentHashMap<Integer, SlotContext>();
        this.assignedSlots = new ConcurrentHashMap<Integer, SlotProfile>();
        this.unassignedSlots = new ConcurrentHashMap<Integer, SlotProfile>();
        this.unassignedResource = new AtomicReference<ResourceProfile>(new ResourceProfile());
        this.assignedResource = new AtomicReference<ResourceProfile>(new ResourceProfile());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, String.format("hz.%s.seaTunnel.slotService.thread", this.nodeEngine.getHazelcastInstance().getName())));
        if (!this.config.isDynamicSlot()) {
            this.initFixedSlots();
        }
        this.unassignedResource.set(this.getNodeResource());
        AtomicInteger systemLoadSendCountDown = new AtomicInteger(2);
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                LOGGER.fine("start send heartbeat to resource manager, this address: " + this.nodeEngine.getClusterService().getThisAddress());
                SystemLoadInfo systemLoadInfo = Optional.of(systemLoadSendCountDown.decrementAndGet()).filter(count -> count == 0 && this.config.getAllocateStrategy() == AllocateStrategy.SYSTEM_LOAD).map(count -> {
                    systemLoadSendCountDown.set(2);
                    SystemLoadInfo info = new SystemLoadInfo();
                    info.setCpuPercentage(this.getCpuPercentage());
                    info.setMemPercentage(this.getMemPercentage());
                    LOGGER.fine("send system load info to master");
                    return info;
                }).orElse(null);
                WorkerProfile workerProfile = this.getWorkerProfile();
                Optional.ofNullable(systemLoadInfo).ifPresent(workerProfile::setSystemLoadInfo);
                this.sendToMaster(new WorkerHeartbeatOperation(workerProfile)).join();
            }
            catch (Exception e) {
                LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " + this.nodeEngine.getClusterService().getThisAddress());
            }
        }, 0L, 5000L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reset() {
        if (!this.initStatus) {
            DefaultSlotService defaultSlotService = this;
            synchronized (defaultSlotService) {
                if (!this.initStatus) {
                    this.close();
                    this.init();
                }
            }
        }
    }

    @Override
    public synchronized SlotAndWorkerProfile requestSlot(long jobId, ResourceProfile resourceProfile) {
        this.initStatus = false;
        SlotProfile profile = this.selectBestMatchSlot(resourceProfile);
        if (profile != null) {
            profile.assign(jobId);
            this.assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
            this.unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
            this.unassignedSlots.remove(profile.getSlotID());
            this.assignedSlots.put(profile.getSlotID(), profile);
            this.contexts.computeIfAbsent(profile.getSlotID(), p -> new SlotContext(profile.getSlotID(), this.taskExecutionService));
        }
        LOGGER.fine(String.format("received slot request, jobID: %d, resource profile: %s, return: %s", jobId, resourceProfile, profile));
        return new SlotAndWorkerProfile(this.getWorkerProfile(), profile);
    }

    @Override
    public SlotContext getSlotContext(SlotProfile slotProfile) {
        if (!this.contexts.containsKey(slotProfile.getSlotID())) {
            throw new WrongTargetSlotException("Unknown slot in slot service, slot profile: " + slotProfile);
        }
        return (SlotContext)this.contexts.get(slotProfile.getSlotID());
    }

    @Override
    public synchronized void releaseSlot(long jobId, SlotProfile profile) {
        LOGGER.info(String.format("received slot release request, jobID: %d, slot: %s", jobId, profile));
        if (!this.assignedSlots.containsKey(profile.getSlotID())) {
            throw new WrongTargetSlotException("Not exist this slot in slot service, slot profile: " + profile);
        }
        if (!((SlotProfile)this.assignedSlots.get(profile.getSlotID())).getSequence().equals(profile.getSequence())) {
            throw new WrongTargetSlotException("Wrong slot sequence in profile, slot profile: " + profile);
        }
        if (((SlotProfile)this.assignedSlots.get(profile.getSlotID())).getOwnerJobID() != jobId) {
            throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d", this.assignedSlots.get(profile.getSlotID()), jobId));
        }
        this.assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
        this.unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
        profile.unassigned();
        if (!this.config.isDynamicSlot()) {
            this.unassignedSlots.put(profile.getSlotID(), profile);
        }
        this.assignedSlots.remove(profile.getSlotID());
        this.contexts.remove(profile.getSlotID());
    }

    @Override
    public void close() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
        if (this.unassignedSlots.isEmpty() && !this.config.isDynamicSlot()) {
            return null;
        }
        if (this.config.isDynamicSlot()) {
            if (this.unassignedResource.get().enoughThan(profile)) {
                return new SlotProfile(this.nodeEngine.getThisAddress(), (int)this.idGenerator.getNextId(), profile, this.slotServiceSequence);
            }
        } else {
            Optional<SlotProfile> result = this.unassignedSlots.values().stream().filter(slot -> slot.getResourceProfile().enoughThan(profile)).min((slot1, slot2) -> {
                if (slot1.getResourceProfile().getHeapMemory().getBytes() != slot2.getResourceProfile().getHeapMemory().getBytes()) {
                    return slot1.getResourceProfile().getHeapMemory().getBytes() - slot2.getResourceProfile().getHeapMemory().getBytes() >= 0L ? 1 : -1;
                }
                return slot1.getResourceProfile().getCpu().getCore() - slot2.getResourceProfile().getCpu().getCore();
            });
            return result.orElse(null);
        }
        return null;
    }

    private void initFixedSlots() {
        long maxMemory = Runtime.getRuntime().maxMemory();
        for (int i = 0; i < this.config.getSlotNum(); ++i) {
            this.unassignedSlots.put(i, new SlotProfile(this.nodeEngine.getThisAddress(), i, new ResourceProfile(CPU.of(0), Memory.of(maxMemory / (long)this.config.getSlotNum())), this.slotServiceSequence));
        }
    }

    @Override
    public synchronized WorkerProfile getWorkerProfile() {
        WorkerProfile workerProfile = new WorkerProfile(this.nodeEngine.getThisAddress());
        workerProfile.setProfile(this.getNodeResource());
        workerProfile.setAssignedSlots(this.assignedSlots.values().toArray(new SlotProfile[0]));
        workerProfile.setUnassignedSlots(this.unassignedSlots.values().toArray(new SlotProfile[0]));
        workerProfile.setUnassignedResource(this.unassignedResource.get());
        workerProfile.setAttributes(this.nodeEngine.getLocalMember().getAttributes());
        workerProfile.setDynamicSlot(this.config.isDynamicSlot());
        return workerProfile;
    }

    private ResourceProfile getNodeResource() {
        return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
    }

    public <E> InvocationFuture<E> sendToMaster(Operation operation) {
        return NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.nodeEngine, operation);
    }

    public double getMemPercentage() {
        MemoryMXBean memoryMxBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapMemoryUsage = memoryMxBean.getHeapMemoryUsage();
        return (double)heapMemoryUsage.getUsed() / (double)heapMemoryUsage.getMax();
    }

    public double getCpuPercentage() {
        SystemInfo si = new SystemInfo();
        HardwareAbstractionLayer hal = si.getHardware();
        CentralProcessor processor = hal.getProcessor();
        long[] prevTicks = processor.getSystemCpuLoadTicks();
        Thread.sleep(1000L);
        long[] ticks = processor.getSystemCpuLoadTicks();
        long user = ticks[CentralProcessor.TickType.USER.getIndex()] - prevTicks[CentralProcessor.TickType.USER.getIndex()];
        long nice = ticks[CentralProcessor.TickType.NICE.getIndex()] - prevTicks[CentralProcessor.TickType.NICE.getIndex()];
        long sys = ticks[CentralProcessor.TickType.SYSTEM.getIndex()] - prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()];
        long idle = ticks[CentralProcessor.TickType.IDLE.getIndex()] - prevTicks[CentralProcessor.TickType.IDLE.getIndex()];
        long totalCpu = user + nice + sys + idle;
        return (double)(totalCpu - idle) / (double)totalCpu;
    }
}

