/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.common.register;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
import com.aizuda.snailjob.server.common.convert.RegisterNodeInfoConverter;
import com.aizuda.snailjob.server.common.dto.InstanceLiveInfo;
import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO;
import com.aizuda.snailjob.server.common.handler.InstanceManager;
import com.aizuda.snailjob.server.common.register.AbstractRegister;
import com.aizuda.snailjob.server.common.register.RegisterContext;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;

public class ClientRegister
extends AbstractRegister {
    public static final String BEAN_NAME = "clientRegister";
    public static final int DELAY_TIME = 30;
    protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque(1000);
    private RefreshNodeSchedule refreshNodeSchedule;

    @Override
    public boolean supports(int type) {
        return this.getNodeType().equals(type);
    }

    @Override
    protected void beforeProcessor(RegisterContext context) {
    }

    @Override
    protected LocalDateTime getExpireAt() {
        return LocalDateTime.now().plusSeconds(30L);
    }

    @Override
    protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
        if ("/beat".equals(context.getUri())) {
            return QUEUE.offerFirst(serverNode);
        }
        return QUEUE.offerLast(serverNode);
    }

    @Override
    protected void afterProcessor(ServerNode serverNode) {
    }

    @Override
    protected Integer getNodeType() {
        return NodeTypeEnum.CLIENT.getType();
    }

    @Override
    public void start() {
        this.refreshNodeSchedule.startScheduler();
    }

    @Override
    public void close() {
    }

    public static List<ServerNode> getExpireNodes() {
        ServerNode serverNode = QUEUE.poll();
        if (Objects.nonNull(serverNode)) {
            ArrayList lists = Lists.newArrayList((Object[])new ServerNode[]{serverNode});
            QUEUE.drainTo(lists, 256);
            return lists;
        }
        return null;
    }

    public List<ServerNode> refreshLocalCache() {
        List<ServerNode> expireNodes = ClientRegister.getExpireNodes();
        if (Objects.nonNull(expireNodes)) {
            for (ServerNode serverNode : expireNodes) {
                serverNode.setExpireAt(LocalDateTime.now().plusSeconds(30L));
                this.instanceManager.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode));
                CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId());
            }
        }
        return expireNodes;
    }

    public RefreshNodeSchedule newRefreshNodeSchedule(ServerNodeMapper serverNodeMapper, InstanceManager instanceManager) {
        return new RefreshNodeSchedule(serverNodeMapper, instanceManager);
    }

    @Generated
    public void setRefreshNodeSchedule(RefreshNodeSchedule refreshNodeSchedule) {
        this.refreshNodeSchedule = refreshNodeSchedule;
    }

    public class RefreshNodeSchedule
    extends AbstractSchedule {
        private ThreadPoolExecutor refreshNodePool;
        private final ServerNodeMapper serverNodeMapper;
        private final InstanceManager instanceManager;

        @Override
        protected void doExecute() {
            try {
                List<ServerNode> allClientList;
                Set<InstanceLiveInfo> instanceALiveInfoSet = this.instanceManager.getInstanceALiveInfoSet("DEFAULT_SERVER_NAMESPACE_ID", "DEFAULT_SERVER");
                instanceALiveInfoSet = instanceALiveInfoSet.stream().filter(info -> !info.getNodeInfo().getHostId().equals(ServerRegister.CURRENT_CID)).collect(Collectors.toSet());
                ArrayList<ServerNode> waitRefreshDBClientNodes = new ArrayList<ServerNode>();
                List<ServerNode> refreshCache = ClientRegister.this.refreshLocalCache();
                if (CollUtil.isNotEmpty(refreshCache)) {
                    waitRefreshDBClientNodes.addAll(refreshCache);
                }
                if (!instanceALiveInfoSet.isEmpty() && CollUtil.isNotEmpty(allClientList = this.pullRemoteNodeClientRegisterInfo(instanceALiveInfoSet))) {
                    waitRefreshDBClientNodes.addAll(allClientList);
                }
                if (CollUtil.isEmpty(waitRefreshDBClientNodes)) {
                    SnailJobLog.LOCAL.debug("clientNodes is empty", new Object[0]);
                    return;
                }
                SnailJobLog.LOCAL.debug("start refresh client nodes\uff1a{}", new Object[]{waitRefreshDBClientNodes});
                ClientRegister.this.refreshExpireAt(waitRefreshDBClientNodes);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Refresh failed", new Object[]{e});
            }
        }

        private List<ServerNode> pullRemoteNodeClientRegisterInfo(Set<InstanceLiveInfo> infos) {
            if (CollUtil.isEmpty(infos)) {
                return Lists.newArrayList();
            }
            int size = infos.size();
            ArrayList<Future<String>> futures = new ArrayList<Future<String>>(size);
            for (InstanceLiveInfo liveInfo : infos) {
                Future<String> future2 = this.refreshNodePool.submit(() -> {
                    try {
                        CommonRpcClient serverRpcClient = this.buildRpcClient(liveInfo);
                        Result<String> regNodesAndFlush = serverRpcClient.pullRemoteNodeClientRegisterInfo(new PullRemoteNodeClientRegisterInfoDTO());
                        return (String)regNodesAndFlush.getData();
                    }
                    catch (Exception e) {
                        return "";
                    }
                });
                futures.add(future2);
            }
            return futures.stream().map(future -> {
                try {
                    String jsonString = (String)future.get(1L, TimeUnit.SECONDS);
                    if (Objects.nonNull(jsonString)) {
                        return JsonUtil.parseList((String)jsonString, ServerNode.class);
                    }
                    return new ArrayList();
                }
                catch (Exception e) {
                    return new ArrayList();
                }
            }).filter(Objects::nonNull).flatMap(Collection::stream).distinct().toList();
        }

        private CommonRpcClient buildRpcClient(InstanceLiveInfo info) {
            int maxRetryTimes = 3;
            return RequestBuilder.newBuilder().nodeInfo(info).failRetry(true).retryTimes(maxRetryTimes).client(CommonRpcClient.class).build();
        }

        @Override
        public String lockName() {
            return "registerNode";
        }

        @Override
        public String lockAtMost() {
            return "PT10S";
        }

        @Override
        public String lockAtLeast() {
            return "PT5S";
        }

        public void startScheduler() {
            this.refreshNodePool = new ThreadPoolExecutor(4, 8, 1L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1000));
            this.refreshNodePool.allowCoreThreadTimeOut(true);
            this.taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
        }

        @Generated
        public RefreshNodeSchedule(ServerNodeMapper serverNodeMapper, InstanceManager instanceManager) {
            this.serverNodeMapper = serverNodeMapper;
            this.instanceManager = instanceManager;
        }
    }
}

