/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.agent.task;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeTaskDataNodeAgent
extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskDataNodeAgent.class);
    protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

    protected boolean isShutdown() {
        return PipeAgent.runtime().isShutdown();
    }

    protected Map<TConsensusGroupId, PipeTask> buildPipeTasks(PipeMeta pipeMetaFromConfigNode) {
        return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
    }

    public void stopAllPipesWithCriticalException() {
        try {
            int retryCount = 0;
            while (true) {
                if (this.tryWriteLockWithTimeOut(5L)) {
                    try {
                        this.stopAllPipesWithCriticalExceptionInternal();
                        LOGGER.info("Stopped all pipes with critical exception.");
                        return;
                    }
                    finally {
                        this.releaseWriteLock();
                    }
                }
                Thread.sleep(1000L);
                LOGGER.warn("Failed to stop all pipes with critical exception, retry count: {}.", (Object)(++retryCount));
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Interrupted when trying to stop all pipes with critical exception, exception message: {}", (Object)e.getMessage(), (Object)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOGGER.error("Failed to stop all pipes with critical exception, exception message: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void stopAllPipesWithCriticalExceptionInternal() {
        int currentDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
        HashMap reusedConnectorParameters2ExceptionMap = new HashMap();
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta -> {
            PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
            PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
            runtimeMeta.getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                if (pipeTaskMeta.getLeaderDataNodeId() != currentDataNodeId) {
                    return;
                }
                for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
                    if (!(e instanceof PipeRuntimeConnectorCriticalException)) continue;
                    reusedConnectorParameters2ExceptionMap.putIfAbsent(staticMeta.getConnectorParameters(), (PipeRuntimeConnectorCriticalException)e);
                }
            });
        });
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta -> {
            PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
            PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
            runtimeMeta.getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                if (pipeTaskMeta.getLeaderDataNodeId() == currentDataNodeId && reusedConnectorParameters2ExceptionMap.containsKey(staticMeta.getConnectorParameters()) && !pipeTaskMeta.containsExceptionMessage((PipeRuntimeException)reusedConnectorParameters2ExceptionMap.get(staticMeta.getConnectorParameters()))) {
                    PipeRuntimeConnectorCriticalException exception = (PipeRuntimeConnectorCriticalException)reusedConnectorParameters2ExceptionMap.get(staticMeta.getConnectorParameters());
                    pipeTaskMeta.trackExceptionMessage((PipeRuntimeException)exception);
                    LOGGER.warn("Pipe {} (creation time = {}) will be stopped because of critical exception (occurred time {}) in connector {}.", new Object[]{staticMeta.getPipeName(), DateTimeUtils.convertLongToDate(staticMeta.getCreationTime(), "ms"), DateTimeUtils.convertLongToDate(exception.getTimeStamp(), "ms"), staticMeta.getConnectorParameters()});
                }
            });
        });
        this.pipeMetaKeeper.getPipeMetaList().forEach(pipeMeta -> {
            PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
            PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
            if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
                runtimeMeta.getConsensusGroupId2TaskMetaMap().values().forEach(pipeTaskMeta -> {
                    for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
                        if (!(e instanceof PipeRuntimeCriticalException)) continue;
                        this.stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
                        LOGGER.warn("Pipe {} (creation time = {}) was stopped because of critical exception (occurred time {}).", new Object[]{staticMeta.getPipeName(), DateTimeUtils.convertLongToDate(staticMeta.getCreationTime(), "ms"), DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms")});
                        return;
                    }
                });
            }
        });
    }

    protected void createPipeTask(TConsensusGroupId consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) {
        if (consensusGroupId.getType() != TConsensusGroupType.ConfigRegion && pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
            PipeDataNodeTask pipeTask;
            switch (consensusGroupId.getType()) {
                case DataRegion: {
                    pipeTask = new PipeDataNodeTaskDataRegionBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
                    break;
                }
                case SchemaRegion: {
                    pipeTask = new PipeDataNodeTaskSchemaRegionBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported consensus group type: " + consensusGroupId.getType());
                }
            }
            pipeTask.create();
            this.pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, (PipeTask)pipeTask);
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(consensusGroupId, pipeTaskMeta);
    }

    public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException {
        if (!this.tryReadLockWithTimeOut(10L)) {
            return;
        }
        try {
            this.collectPipeMetaListInternal(resp);
        }
        finally {
            this.releaseReadLock();
        }
    }

    private void collectPipeMetaListInternal(TDataNodeHeartbeatResp resp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        ArrayList<ByteBuffer> pipeMetaBinaryList = new ArrayList<ByteBuffer>();
        try {
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                pipeMetaBinaryList.add(pipeMeta.serialize());
                LOGGER.info("Reporting pipe meta: {}", (Object)pipeMeta.coreReportMessage());
            }
        }
        catch (IOException e) {
            throw new TException((Throwable)e);
        }
        resp.setPipeMetaList(pipeMetaBinaryList);
    }

    public void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) throws TException {
        this.acquireReadLock();
        try {
            this.collectPipeMetaListInternal(req, resp);
        }
        finally {
            this.releaseReadLock();
        }
    }

    private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) throws TException {
        if (PipeAgent.runtime().isShutdown()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config node.", (Object)req.heartbeatId);
        ArrayList<ByteBuffer> pipeMetaBinaryList = new ArrayList<ByteBuffer>();
        try {
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                pipeMetaBinaryList.add(pipeMeta.serialize());
                LOGGER.info("Reporting pipe meta: {}", (Object)pipeMeta.coreReportMessage());
            }
        }
        catch (IOException e) {
            throw new TException((Throwable)e);
        }
        resp.setPipeMetaList(pipeMetaBinaryList);
        PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
    }

    public void restartAllStuckPipes() {
        if (!this.tryWriteLockWithTimeOut(5L)) {
            return;
        }
        try {
            this.restartAllStuckPipesInternal();
        }
        finally {
            this.releaseWriteLock();
        }
    }

    private void restartAllStuckPipesInternal() {
        Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap = PipeExtractorMetrics.getInstance().getExtractorMap();
        HashSet<PipeMeta> stuckPipes = new HashSet<PipeMeta>();
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            List extractors = taskId2ExtractorMap.values().stream().filter(e -> e.getPipeName().equals(pipeName)).collect(Collectors.toList());
            if (extractors.isEmpty() || !((IoTDBDataRegionExtractor)extractors.get(0)).isStreamMode() || extractors.stream().noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) || !this.mayMemTablePinnedCountReachDangerousThreshold() && !this.mayWalSizeReachThrottleThreshold()) continue;
            LOGGER.warn("Pipe {} may be stuck.", (Object)pipeMeta.getStaticMeta());
            stuckPipes.add(pipeMeta);
        }
        stuckPipes.parallelStream().forEach(this::restartStuckPipe);
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeResourceManager.wal().getPinnedWalCount() >= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3L * WALManager.getInstance().getTotalDiskUsage() > 2L * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
    }

    private void restartStuckPipe(PipeMeta pipeMeta) {
        LOGGER.warn("Pipe {} will be restarted because of stuck.", (Object)pipeMeta.getStaticMeta());
        long startTime = System.currentTimeMillis();
        this.handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
        this.handleSinglePipeMetaChangesInternal(pipeMeta);
        LOGGER.warn("Pipe {} was restarted because of stuck, time cost: {} ms.", (Object)pipeMeta.getStaticMeta(), (Object)(System.currentTimeMillis() - startTime));
    }
}

