/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.db.pipe.processor.twostage.combiner.PipeCombineHandlerManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoStageAggregateSender
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateSender.class);
    private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
    private final String pipeName;
    private final long creationTime;
    private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0L);
    private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS = new AtomicReference();
    private TEndPoint[] endPoints;
    private final Map<TEndPoint, IoTDBSyncClient> endPointIoTDBSyncClientMap = new ConcurrentHashMap<TEndPoint, IoTDBSyncClient>();

    public TwoStageAggregateSender(String pipeName, long creationTime) {
        this.pipeName = pipeName;
        this.creationTime = creationTime;
    }

    public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req) throws TException {
        boolean endPointsChanged = TwoStageAggregateSender.tryFetchEndPointsIfNecessary();
        this.tryConstructClients(endPointsChanged);
        TEndPoint endPoint = this.endPoints[(int)watermark % this.endPoints.length];
        IoTDBSyncClient client = this.endPointIoTDBSyncClientMap.get(endPoint);
        if (client == null) {
            client = this.reconstructIoTDBSyncClient(endPoint);
        }
        LOGGER.info("Sending request {} (watermark = {}) to {}", new Object[]{req, watermark, endPoint});
        try {
            return client.pipeTransfer(req);
        }
        catch (Exception e) {
            LOGGER.warn("Failed to send request {} (watermark = {}) to {}", new Object[]{req, watermark, endPoint, e});
            try {
                this.reconstructIoTDBSyncClient(endPoint);
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to reconstruct IoTDBSyncClient {} after failure to send request {} (watermark = {})", new Object[]{endPoint, req, watermark, ex});
            }
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean tryFetchEndPointsIfNecessary() {
        long currentTime = System.currentTimeMillis();
        if (DATANODE_ID_2_END_POINTS.get() != null && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get() < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
            return false;
        }
        AtomicReference<Map<Integer, TEndPoint>> atomicReference = DATANODE_ID_2_END_POINTS;
        synchronized (atomicReference) {
            if (DATANODE_ID_2_END_POINTS.get() != null && currentTime - DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.get() < PIPE_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs()) {
                return false;
            }
            HashMap<Integer, TEndPoint> dataNodeId2EndPointMap = new HashMap<Integer, TEndPoint>();
            try (ConfigNodeClient configNodeClient = (ConfigNodeClient)ConfigNodeClientManager.getInstance().borrowClient((Object)ConfigNodeInfo.CONFIG_REGION_ID);){
                TShowDataNodesResp showDataNodesResp = configNodeClient.showDataNodes();
                if (showDataNodesResp == null || showDataNodesResp.getDataNodesInfoList() == null) {
                    throw new PipeException("Failed to fetch data nodes");
                }
                for (TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) {
                    dataNodeId2EndPointMap.put(dataNodeInfo.getDataNodeId(), new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort()));
                }
            }
            catch (ClientManagerException | TException e) {
                throw new PipeException("Failed to fetch data nodes", e);
            }
            if (dataNodeId2EndPointMap.isEmpty()) {
                throw new PipeException("No data nodes' endpoints fetched");
            }
            DATANODE_ID_2_END_POINTS.set(dataNodeId2EndPointMap);
            DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME.set(currentTime);
        }
        LOGGER.info("Data nodes' endpoints for two-stage aggregation: {}", DATANODE_ID_2_END_POINTS);
        return true;
    }

    private void tryConstructClients(boolean endPointsChanged) {
        if (Objects.nonNull(this.endPoints) && !endPointsChanged) {
            return;
        }
        Set<Integer> expectedDataNodeIdSet = PipeCombineHandlerManager.getInstance().getExpectedDataNodeIdSet(this.pipeName, this.creationTime);
        if (expectedDataNodeIdSet.isEmpty()) {
            throw new PipeException("No expected region id set fetched");
        }
        this.endPoints = (TEndPoint[])DATANODE_ID_2_END_POINTS.get().entrySet().stream().filter(entry -> expectedDataNodeIdSet.contains(entry.getKey())).map(Map.Entry::getValue).toArray(TEndPoint[]::new);
        LOGGER.info("End points for two-stage aggregation pipe (pipeName={}, creationTime={}) were updated to {}", new Object[]{this.pipeName, this.creationTime, this.endPoints});
        for (TEndPoint endPoint : this.endPoints) {
            if (this.endPointIoTDBSyncClientMap.containsKey(endPoint)) continue;
            try {
                this.endPointIoTDBSyncClientMap.put(endPoint, this.constructIoTDBSyncClient(endPoint));
            }
            catch (TTransportException e) {
                LOGGER.warn("Failed to construct IoTDBSyncClient", (Throwable)e);
            }
        }
        for (TEndPoint endPoint : new HashSet<TEndPoint>(this.endPointIoTDBSyncClientMap.keySet())) {
            if (DATANODE_ID_2_END_POINTS.get().containsValue(endPoint)) continue;
            try {
                this.endPointIoTDBSyncClientMap.remove(endPoint).close();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close IoTDBSyncClient", (Throwable)e);
            }
        }
    }

    private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
        IoTDBSyncClient oldClient = this.endPointIoTDBSyncClientMap.remove(endPoint);
        if (oldClient != null) {
            try {
                oldClient.close();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close old IoTDBSyncClient", (Throwable)e);
            }
        }
        IoTDBSyncClient newClient = this.constructIoTDBSyncClient(endPoint);
        this.endPointIoTDBSyncClientMap.put(endPoint, newClient);
        return newClient;
    }

    private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
        return new IoTDBSyncClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs()).setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()).build(), endPoint.getIp(), endPoint.getPort(), false, null, null);
    }

    @Override
    public void close() {
        for (IoTDBSyncClient client : this.endPointIoTDBSyncClientMap.values()) {
            try {
                client.close();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close IoTDBSyncClient", (Throwable)e);
            }
        }
        this.endPointIoTDBSyncClientMap.clear();
        this.endPoints = null;
    }
}

