/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.common.handler;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.ClientLoadBalance;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.snailjob.server.common.convert.RegisterNodeInfoConverter;
import com.aizuda.snailjob.server.common.dto.InstanceKey;
import com.aizuda.snailjob.server.common.dto.InstanceLiveInfo;
import com.aizuda.snailjob.server.common.dto.InstanceSelectCondition;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.UpdateClientInfoDTO;
import com.aizuda.snailjob.server.common.rpc.client.grpc.GrpcChannel;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Sets;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class InstanceManager
implements Lifecycle {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InstanceManager.class);
    private final ServerNodeMapper serverNodeMapper;
    private final Set<ConnectivityState> STATES = Sets.newHashSet((Object[])new ConnectivityState[]{ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.SHUTDOWN});
    private final int timeout = 40;
    private static final ConcurrentHashMap<InstanceKey, InstanceLiveInfo> INSTANCE_MAP = new ConcurrentHashMap();
    private static final ScheduledExecutorService INSTANCE_TIMEOUT_CHECK = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "instance-timeout-check-thread"));

    public void registerOrUpdate(RegisterNodeInfo info) {
        InstanceKey instanceKey = InstanceKey.builder().namespaceId(info.getNamespaceId()).groupName(info.getGroupName()).hostId(info.getHostId()).build();
        this.registerOrUpdate(instanceKey, info);
    }

    public void registerOrUpdate(InstanceKey key, RegisterNodeInfo info) {
        if (Objects.isNull(key) || Objects.isNull(info)) {
            SnailJobLog.LOCAL.error("Illegal registration of instance information", new Object[0]);
            return;
        }
        INSTANCE_MAP.compute(key, (instanceKey, existing) -> {
            ConnectivityState channelState;
            if (existing == null) {
                existing = new InstanceLiveInfo();
                existing.setNodeInfo(info);
                ManagedChannel channel = GrpcChannel.connect(info.getHostIp(), info.getHostPort());
                existing.setAlive(Objects.nonNull(channel));
                existing.setChannel(channel);
                existing.setLastUpdateAt(System.currentTimeMillis());
                return existing;
            }
            if (!existing.isAlive() && this.STATES.contains(channelState = existing.getChannel().getState(true))) {
                SnailJobLog.LOCAL.warn("Node channel state check {}. {}", new Object[]{existing.getNodeInfo().address(), channelState});
                return existing;
            }
            existing.setLastUpdateAt(System.currentTimeMillis());
            existing.setAlive(true);
            return existing;
        });
    }

    public InstanceLiveInfo getInstanceALiveInfoSet(final InstanceKey instanceKey) {
        Set<InstanceLiveInfo> instanceALiveInfoSet = this.getInstanceALiveInfoSet(instanceKey.getNamespaceId(), instanceKey.getGroupName());
        return StreamUtils.filter(instanceALiveInfoSet, (Predicate)new Predicate<InstanceLiveInfo>(){

            @Override
            public boolean test(InstanceLiveInfo instanceLiveInfo) {
                return instanceLiveInfo.getNodeInfo().getHostId().equals(instanceKey.getHostId());
            }
        }).stream().findFirst().orElse(null);
    }

    public Set<InstanceLiveInfo> getInstanceALiveInfoSet(String namespaceId, String groupName, String targetLabels) {
        HashMap targetLabelsMap = StrUtil.isNotBlank((CharSequence)targetLabels) ? JsonUtil.parseHashMap((String)targetLabels) : new HashMap(1);
        Set<InstanceLiveInfo> instanceALiveInfoSet = this.getInstanceALiveInfoSet(namespaceId, groupName);
        HashMap finalTargetLabelsMap = targetLabelsMap;
        return new HashSet<InstanceLiveInfo>(StreamUtils.filter(instanceALiveInfoSet, instanceLiveInfo -> {
            RegisterNodeInfo nodeInfo = instanceLiveInfo.getNodeInfo();
            return this.matchLabels(nodeInfo.getLabelMap(), finalTargetLabelsMap);
        }));
    }

    public Set<InstanceLiveInfo> getInstanceALiveInfoSet(String namespaceId, String groupName, Map<String, String> targetLabels) {
        Set<InstanceLiveInfo> instanceALiveInfoSet = this.getInstanceALiveInfoSet(namespaceId, groupName);
        return new HashSet<InstanceLiveInfo>(StreamUtils.filter(instanceALiveInfoSet, instanceLiveInfo -> {
            RegisterNodeInfo nodeInfo = instanceLiveInfo.getNodeInfo();
            return this.matchLabels(nodeInfo.getLabelMap(), targetLabels);
        }));
    }

    public Set<InstanceLiveInfo> getInstanceALiveInfoSet(String namespaceId, String groupName) {
        Set allPods = this.getAllInstances().stream().filter(instanceLiveInfo -> {
            RegisterNodeInfo nodeInfo = instanceLiveInfo.getNodeInfo();
            return nodeInfo.getGroupName().equals(groupName) && nodeInfo.getNamespaceId().equals(namespaceId);
        }).collect(Collectors.toSet());
        if (CollUtil.isEmpty(allPods)) {
            List serverNodes = this.serverNodeMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(ServerNode::getNamespaceId, (Object)namespaceId)).eq(ServerNode::getGroupName, (Object)groupName));
            if (CollUtil.isEmpty((Collection)serverNodes)) {
                return Sets.newHashSet();
            }
            for (ServerNode node : serverNodes) {
                this.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(node));
            }
            return this.getInstanceALiveInfoSet(namespaceId, groupName);
        }
        return allPods.stream().filter(InstanceLiveInfo::isAlive).collect(Collectors.toSet());
    }

    public Set<InstanceLiveInfo> getAllInstances() {
        Collection<InstanceLiveInfo> values = INSTANCE_MAP.values();
        return new TreeSet<InstanceLiveInfo>(values);
    }

    public InstanceLiveInfo getALiveInstanceByRouteKey(InstanceSelectCondition conditionDTO) {
        Set<InstanceLiveInfo> instanceLiveInfos = this.getInstanceALiveInfoSet(conditionDTO.getNamespaceId(), conditionDTO.getGroupName());
        if (CollUtil.isEmpty(instanceLiveInfos)) {
            SnailJobLog.LOCAL.warn("client node is null. groupName:[{}]", new Object[]{conditionDTO.getGroupName()});
            return null;
        }
        Set registerNodeInfos = instanceLiveInfos.stream().map(InstanceLiveInfo::getNodeInfo).filter(node -> this.matchLabels(node.getLabelMap(), conditionDTO.getTargetLabels())).collect(Collectors.toSet());
        ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(conditionDTO.getRouteKey());
        String hostId = clientLoadBalanceRandom.route(conditionDTO.getAllocKey(), new TreeSet<String>(StreamUtils.toSet(registerNodeInfos, RegisterNodeInfo::getHostId)));
        Stream<InstanceLiveInfo> registerNodeInfoStream = instanceLiveInfos.stream().filter(s -> s.getNodeInfo().getHostId().equals(hostId));
        return registerNodeInfoStream.findFirst().orElse(null);
    }

    public InstanceLiveInfo getInstance(InstanceKey key) {
        InstanceLiveInfo instanceLiveInfo = INSTANCE_MAP.get(key);
        if (Objects.isNull(instanceLiveInfo)) {
            List serverNodes = this.serverNodeMapper.selectList((Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(ServerNode::getNamespaceId, (Object)key.getNamespaceId())).eq(ServerNode::getGroupName, (Object)key.getGroupName())).eq(ServerNode::getHostId, (Object)key.getHostId())).orderByDesc(ServerNode::getExpireAt));
            if (CollUtil.isEmpty((Collection)serverNodes)) {
                return null;
            }
            this.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo((ServerNode)serverNodes.get(0)));
            return INSTANCE_MAP.get(key);
        }
        return instanceLiveInfo;
    }

    public void remove(InstanceKey key) {
        INSTANCE_MAP.remove(key);
    }

    @Override
    public void start() {
        INSTANCE_TIMEOUT_CHECK.scheduleAtFixedRate(() -> {
            try {
                long now = DateUtils.toNowMilli();
                for (Map.Entry<InstanceKey, InstanceLiveInfo> entry : INSTANCE_MAP.entrySet()) {
                    InstanceLiveInfo info = entry.getValue();
                    ManagedChannel channel = info.getChannel();
                    ConnectivityState channelState = channel.getState(!info.isAlive());
                    if (this.STATES.contains(channelState)) {
                        SnailJobLog.LOCAL.warn("Node channel state check {}. {}", new Object[]{info.getNodeInfo().address(), channelState});
                        info.setAlive(Boolean.FALSE);
                    } else {
                        info.setAlive(Boolean.TRUE);
                    }
                    if (now - info.getLastUpdateAt() <= TimeUnit.SECONDS.toMillis(40L) && !channel.isShutdown() && !channel.isTerminated()) continue;
                    SnailJobLog.LOCAL.info("Node {} is offline. Removing...", new Object[]{info.getNodeInfo().address()});
                    INSTANCE_MAP.remove(entry.getKey());
                    channel.shutdown();
                }
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("instance timeout check is error", new Object[]{e});
            }
        }, 0L, SystemConstants.SCHEDULE_PERIOD, TimeUnit.SECONDS);
    }

    @Override
    public void close() {
    }

    private boolean matchLabels(Map<String, String> nodeLabels, Map<String, String> targetLabels) {
        if (CollUtil.isEmpty(nodeLabels)) {
            return true;
        }
        targetLabels.put((String)SystemConstants.DEFAULT_LABEL.getKey(), (String)SystemConstants.DEFAULT_LABEL.getValue());
        for (Map.Entry<String, String> entry : targetLabels.entrySet()) {
            if (entry.getValue().equals(nodeLabels.get(entry.getKey()))) continue;
            return false;
        }
        return true;
    }

    public void updateInstanceLabels(UpdateClientInfoDTO clientInfoDTO) {
        if (clientInfoDTO == null) {
            return;
        }
        InstanceKey instanceKey = InstanceKey.builder().namespaceId(clientInfoDTO.getNamespaceId()).groupName(clientInfoDTO.getGroupName()).hostId(clientInfoDTO.getHostId()).build();
        InstanceLiveInfo instanceLiveInfo = INSTANCE_MAP.get(instanceKey);
        if (Objects.isNull(instanceLiveInfo)) {
            ServerNode serverNode = (ServerNode)this.serverNodeMapper.selectOne((Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(ServerNode::getNamespaceId, (Object)clientInfoDTO.getNamespaceId())).eq(ServerNode::getGroupName, (Object)clientInfoDTO.getGroupName())).eq(ServerNode::getHostId, (Object)clientInfoDTO.getHostId())).eq(ServerNode::getHostIp, (Object)clientInfoDTO.getHostIp()));
            if (Objects.isNull(serverNode)) {
                return;
            }
            this.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode));
            return;
        }
        if (StrUtil.isNotBlank((CharSequence)clientInfoDTO.getLabels())) {
            instanceLiveInfo.getNodeInfo().setLabels(clientInfoDTO.getLabels());
            instanceLiveInfo.getNodeInfo().setLabelMap(JsonUtil.parseHashMap((String)clientInfoDTO.getLabels()));
        }
    }

    @Generated
    public InstanceManager(ServerNodeMapper serverNodeMapper) {
        this.serverNodeMapper = serverNodeMapper;
    }
}

