/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.manager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSender;
import org.apache.iotdb.common.rpc.thrift.TServiceProvider;
import org.apache.iotdb.common.rpc.thrift.TServiceType;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.async.CnToCnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.ConfigNodeAsyncRequestContext;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.persistence.ClusterInfo;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
    private final IManager configManager;
    private final ClusterInfo clusterInfo;
    public static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: ";

    public ClusterManager(IManager configManager, ClusterInfo clusterInfo) {
        this.configManager = configManager;
        this.clusterInfo = clusterInfo;
    }

    public void checkClusterId() {
        if (this.clusterInfo.getClusterId() != null) {
            LOGGER.info("clusterID: {}", (Object)this.clusterInfo.getClusterId());
            return;
        }
        this.generateClusterId();
    }

    public String getClusterId() {
        return this.clusterInfo.getClusterId();
    }

    public String getClusterIdWithRetry(long maxWaitTime) {
        long startTime = System.currentTimeMillis();
        while (this.clusterInfo.getClusterId() == null && System.currentTimeMillis() - startTime < maxWaitTime) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Unexpected interruption during waiting for get cluster id.");
                break;
            }
        }
        return this.clusterInfo.getClusterId();
    }

    private void generateClusterId() {
        String clusterId = String.valueOf(UUID.randomUUID());
        UpdateClusterIdPlan updateClusterIdPlan = new UpdateClusterIdPlan(clusterId);
        try {
            this.configManager.getConsensusManager().write(updateClusterIdPlan);
        }
        catch (ConsensusException e) {
            LOGGER.warn(CONSENSUS_WRITE_ERROR, (Throwable)e);
        }
    }

    public TTestConnectionResp submitTestConnectionTaskToEveryNode() {
        TTestConnectionResp resp = new TTestConnectionResp();
        resp.resultList = new ArrayList();
        resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
        TNodeLocations nodeLocations = new TNodeLocations();
        nodeLocations.setConfigNodeLocations(this.configManager.getNodeManager().getRegisteredConfigNodes());
        nodeLocations.setDataNodeLocations(this.configManager.getNodeManager().getRegisteredDataNodes().stream().map(TDataNodeConfiguration::getLocation).collect(Collectors.toList()));
        Map<Integer, TConfigNodeLocation> configNodeLocationMap = this.configManager.getNodeManager().getRegisteredConfigNodes().stream().collect(Collectors.toMap(TConfigNodeLocation::getConfigNodeId, location -> location));
        ConfigNodeAsyncRequestContext configNodeAsyncRequestContext = new ConfigNodeAsyncRequestContext(CnToCnNodeRequestType.SUBMIT_TEST_CONNECTION_TASK, nodeLocations, configNodeLocationMap);
        CnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(configNodeAsyncRequestContext);
        Map<Integer, TConfigNodeLocation> anotherConfigNodeLocationMap = this.configManager.getNodeManager().getRegisteredConfigNodes().stream().collect(Collectors.toMap(TConfigNodeLocation::getConfigNodeId, location -> location));
        configNodeAsyncRequestContext.getResponseMap().forEach((nodeId, configNodeResp) -> {
            if (configNodeResp.isSetResultList()) {
                resp.getResultList().addAll(configNodeResp.getResultList());
            } else {
                resp.getResultList().addAll(this.badConfigNodeConnectionResult(configNodeResp.getStatus(), (TConfigNodeLocation)anotherConfigNodeLocationMap.get(nodeId), nodeLocations));
            }
        });
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodes().stream().map(TDataNodeConfiguration::getLocation).collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location));
        DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext(CnToDnAsyncRequestType.SUBMIT_TEST_CONNECTION_TASK, nodeLocations, dataNodeLocationMap);
        CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(dataNodeAsyncRequestContext);
        Map<Integer, TDataNodeLocation> anotherDataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodes().stream().map(TDataNodeConfiguration::getLocation).collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location));
        dataNodeAsyncRequestContext.getResponseMap().forEach((nodeId, dataNodeResp) -> {
            if (dataNodeResp.isSetResultList()) {
                resp.getResultList().addAll(dataNodeResp.getResultList());
            } else {
                resp.getResultList().addAll(this.badDataNodeConnectionResult(dataNodeResp.getStatus(), (TDataNodeLocation)anotherDataNodeLocationMap.get(nodeId), nodeLocations));
            }
        });
        return resp;
    }

    public TTestConnectionResp doConnectionTest(TNodeLocations nodeLocations) {
        return new TTestConnectionResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), Stream.of(this.testAllConfigNodeConnection(nodeLocations.getConfigNodeLocations()), this.testAllDataNodeConnection(nodeLocations.getDataNodeLocations())).flatMap(Collection::stream).collect(Collectors.toList()));
    }

    private List<TTestConnectionResult> testAllConfigNodeConnection(List<TConfigNodeLocation> configNodeLocations) {
        TSender sender = new TSender().setConfigNodeLocation(ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
        return TestConnectionUtils.testConnectionsImpl(configNodeLocations, (TSender)sender, TConfigNodeLocation::getConfigNodeId, TConfigNodeLocation::getInternalEndPoint, (TServiceType)TServiceType.ConfigNodeInternalService, (Object)((Object)CnToCnNodeRequestType.TEST_CONNECTION), handler -> CnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest((AsyncRequestContext)handler));
    }

    private List<TTestConnectionResult> badConfigNodeConnectionResult(TSStatus badStatus, TConfigNodeLocation sourceConfigNode, TNodeLocations nodeLocations) {
        TSender sender = new TSender().setConfigNodeLocation(sourceConfigNode);
        return this.badNodeConnectionResult(badStatus, nodeLocations, sender);
    }

    private List<TTestConnectionResult> testAllDataNodeConnection(List<TDataNodeLocation> dataNodeLocations) {
        TSender sender = new TSender().setConfigNodeLocation(ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
        return TestConnectionUtils.testConnectionsImpl(dataNodeLocations, (TSender)sender, TDataNodeLocation::getDataNodeId, TDataNodeLocation::getInternalEndPoint, (TServiceType)TServiceType.DataNodeInternalService, (Object)((Object)CnToDnAsyncRequestType.TEST_CONNECTION), handler -> CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest((AsyncRequestContext)handler));
    }

    private List<TTestConnectionResult> badDataNodeConnectionResult(TSStatus badStatus, TDataNodeLocation sourceDataNode, TNodeLocations nodeLocations) {
        TSender sender = new TSender().setDataNodeLocation(sourceDataNode);
        return this.badNodeConnectionResult(badStatus, nodeLocations, sender);
    }

    private List<TTestConnectionResult> badNodeConnectionResult(TSStatus badStatus, TNodeLocations nodeLocations, TSender sender) {
        String errorMessage = "ConfigNode leader cannot connect to the sender: " + badStatus.getMessage();
        ArrayList<TTestConnectionResult> results = new ArrayList<TTestConnectionResult>();
        nodeLocations.getConfigNodeLocations().forEach(location -> {
            TEndPoint endPoint = location.getInternalEndPoint();
            TServiceProvider serviceProvider = new TServiceProvider(endPoint, TServiceType.ConfigNodeInternalService);
            TTestConnectionResult result = new TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
            result.setSuccess(false).setReason(errorMessage);
            results.add(result);
        });
        nodeLocations.getDataNodeLocations().forEach(location -> {
            TEndPoint endPoint = location.getInternalEndPoint();
            TServiceProvider serviceProvider = new TServiceProvider(endPoint, TServiceType.DataNodeInternalService);
            TTestConnectionResult result = new TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
            result.setSuccess(false).setReason(errorMessage);
            results.add(result);
        });
        if (sender.isSetDataNodeLocation()) {
            nodeLocations.getDataNodeLocations().forEach(location -> {
                TEndPoint endPoint = location.getMPPDataExchangeEndPoint();
                TServiceProvider serviceProvider = new TServiceProvider(endPoint, TServiceType.DataNodeMPPService);
                TTestConnectionResult result = new TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
                result.setSuccess(false).setReason(errorMessage);
                results.add(result);
            });
            nodeLocations.getDataNodeLocations().forEach(location -> {
                TEndPoint endPoint = location.getClientRpcEndPoint();
                TServiceProvider serviceProvider = new TServiceProvider(endPoint, TServiceType.DataNodeExternalService);
                TTestConnectionResult result = new TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
                result.setSuccess(false).setReason(errorMessage);
                results.add(result);
            });
        }
        return results;
    }
}

