/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.block;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SCMDeletedBlockTransactionStatusManager {
    private static final Logger LOG = LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
    private final Map<Long, Set<DatanodeID>> transactionToDNsCommitMap;
    private final Map<Long, Integer> transactionToRetryCountMap;
    private final DeletedBlockLogStateManager deletedBlockLogStateManager;
    private final ContainerManager containerManager;
    private final ScmBlockDeletingServiceMetrics metrics;
    private final long scmCommandTimeoutMs;
    private final SCMDeleteBlocksCommandStatusManager scmDeleteBlocksCommandStatusManager;

    public SCMDeletedBlockTransactionStatusManager(DeletedBlockLogStateManager deletedBlockLogStateManager, ContainerManager containerManager, ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) {
        this.deletedBlockLogStateManager = deletedBlockLogStateManager;
        this.metrics = metrics;
        this.containerManager = containerManager;
        this.scmCommandTimeoutMs = scmCommandTimeoutMs;
        this.transactionToDNsCommitMap = new ConcurrentHashMap<Long, Set<DatanodeID>>();
        this.transactionToRetryCountMap = new ConcurrentHashMap<Long, Integer>();
        this.scmDeleteBlocksCommandStatusManager = new SCMDeleteBlocksCommandStatusManager(metrics);
    }

    public void incrementRetryCount(List<Long> txIDs) {
        CompletableFuture.runAsync(() -> txIDs.forEach(tx -> this.transactionToRetryCountMap.compute((Long)tx, (k, v) -> v == null ? 1 : v + 1)));
    }

    public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
        this.scmDeleteBlocksCommandStatusManager.onSent(dnId.getID(), scmCommand.getId());
    }

    Map<DatanodeID, Map<Long, SCMDeleteBlocksCommandStatusManager.CmdStatus>> getCommandStatusByTxId(Set<DatanodeID> dnIds) {
        return this.scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
    }

    void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet) {
        this.scmDeleteBlocksCommandStatusManager.recordScmCommand(SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
        dnTxSet.forEach(txId -> this.transactionToDNsCommitMap.putIfAbsent((Long)txId, new LinkedHashSet()));
    }

    public void clear() {
        this.transactionToRetryCountMap.clear();
        this.scmDeleteBlocksCommandStatusManager.clear();
        this.transactionToDNsCommitMap.clear();
    }

    public void cleanAllTimeoutSCMCommand(long timeoutMs) {
        this.scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
    }

    void onDatanodeDead(DatanodeID dnId) {
        this.scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
    }

    boolean isDuplication(DatanodeID datanodeID, long tx, Map<DatanodeID, Map<Long, SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus) {
        if (this.alreadyExecuted(datanodeID, tx)) {
            return true;
        }
        return SCMDeletedBlockTransactionStatusManager.inProcessing(datanodeID, tx, commandStatus);
    }

    private boolean alreadyExecuted(DatanodeID dnId, long txId) {
        Set<DatanodeID> dnsWithTransactionCommitted = this.transactionToDNsCommitMap.get(txId);
        return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted.contains(dnId);
    }

    @VisibleForTesting
    public void commitTransactions(List<StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult> transactionResults, DatanodeID dnId) {
        ArrayList<Long> txIDsToBeDeleted = new ArrayList<Long>();
        for (StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult transactionResult : transactionResults) {
            if (this.isTransactionFailed(transactionResult)) {
                this.metrics.incrBlockDeletionTransactionFailureOnDatanodes();
                continue;
            }
            try {
                List containerDns;
                this.metrics.incrBlockDeletionTransactionSuccessOnDatanodes();
                long txID = transactionResult.getTxID();
                Set<DatanodeID> dnsWithCommittedTxn = this.transactionToDNsCommitMap.get(txID);
                ContainerID containerId = ContainerID.valueOf((long)transactionResult.getContainerID());
                if (dnsWithCommittedTxn == null) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Transaction txId: {} commit by Datanode: {} for ContainerId: {} failed. Corresponding entry not found.", new Object[]{txID, dnId, containerId});
                    continue;
                }
                dnsWithCommittedTxn.add(dnId);
                ContainerInfo container = this.containerManager.getContainer(containerId);
                Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(containerId);
                if (Math.min(replicas.size(), dnsWithCommittedTxn.size()) >= container.getReplicationConfig().getRequiredNodes() && dnsWithCommittedTxn.containsAll(containerDns = replicas.stream().map(ContainerReplica::getDatanodeDetails).map(DatanodeDetails::getID).collect(Collectors.toList()))) {
                    this.transactionToDNsCommitMap.remove(txID);
                    this.transactionToRetryCountMap.remove(txID);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Purging txId: {} from block deletion log", (Object)txID);
                    }
                    txIDsToBeDeleted.add(txID);
                }
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Datanode txId: {} ContainerId: {} committed by Datanode: {}", new Object[]{txID, containerId, dnId});
            }
            catch (IOException e) {
                LOG.warn("Could not commit delete block transaction: " + transactionResult.getTxID(), (Throwable)e);
            }
        }
        try {
            this.deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
            this.metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
        }
        catch (IOException e) {
            LOG.warn("Could not commit delete block transactions: " + txIDsToBeDeleted, (Throwable)e);
        }
    }

    @VisibleForTesting
    void commitSCMCommandStatus(List<StorageContainerDatanodeProtocolProtos.CommandStatus> deleteBlockStatus, DatanodeID dnId) {
        this.processSCMCommandStatus(deleteBlockStatus, dnId);
        this.scmDeleteBlocksCommandStatusManager.cleanTimeoutSCMCommand(dnId, this.scmCommandTimeoutMs);
    }

    static boolean inProcessing(DatanodeID dnId, long deletedBlocksTxId, Map<DatanodeID, Map<Long, SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus) {
        Map<Long, SCMDeleteBlocksCommandStatusManager.CmdStatus> deletedBlocksTxStatus = commandStatus.get(dnId);
        return deletedBlocksTxStatus != null && deletedBlocksTxStatus.get(deletedBlocksTxId) != null;
    }

    private void processSCMCommandStatus(List<StorageContainerDatanodeProtocolProtos.CommandStatus> deleteBlockStatus, DatanodeID dnID) {
        HashMap summary = new HashMap();
        deleteBlockStatus.forEach(cmdStatus -> summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus()));
        LOG.debug("CommandStatus {} from Datanode: {} ", summary, (Object)dnID);
        for (Map.Entry entry : summary.entrySet()) {
            StorageContainerDatanodeProtocolProtos.CommandStatus.Status status = (StorageContainerDatanodeProtocolProtos.CommandStatus.Status)entry.getValue();
            this.scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus(dnID, (Long)entry.getKey(), status);
        }
    }

    private boolean isTransactionFailed(StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult result) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got block deletion ACK from datanode, TXIDs {}, success {}", (Object)result.getTxID(), (Object)result.getSuccess());
        }
        if (!result.getSuccess()) {
            LOG.warn("Got failed ACK for TXID {}, prepare to resend the TX in next interval", (Object)result.getTxID());
            return true;
        }
        return false;
    }

    public int getTransactionToDNsCommitMapSize() {
        return this.transactionToDNsCommitMap.size();
    }

    protected static class SCMDeleteBlocksCommandStatusManager {
        private static final Logger LOG = LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
        private final Map<DatanodeID, Map<Long, CmdStatusData>> scmCmdStatusRecord = new ConcurrentHashMap<DatanodeID, Map<Long, CmdStatusData>>();
        private static final CmdStatus DEFAULT_STATUS = CmdStatus.TO_BE_SENT;
        private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT = Collections.singleton(CmdStatus.SENT);
        private ScmBlockDeletingServiceMetrics metrics;

        public SCMDeleteBlocksCommandStatusManager(ScmBlockDeletingServiceMetrics metrics) {
            this.metrics = metrics;
        }

        protected static CmdStatusData createScmCmdStatusData(DatanodeID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
            return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
        }

        protected void recordScmCommand(CmdStatusData statusData) {
            LOG.debug("Record ScmCommand: {}", (Object)statusData);
            this.scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k -> new ConcurrentHashMap()).put(statusData.getScmCmdId(), statusData);
        }

        void onSent(DatanodeID dnId, long scmCmdId) {
            this.updateStatus(dnId, scmCmdId, StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING);
        }

        void onDatanodeDead(DatanodeID dnId) {
            LOG.info("Clean SCMCommand record for Datanode: {}", (Object)dnId);
            this.scmCmdStatusRecord.remove(dnId);
        }

        void updateStatusByDNCommandStatus(DatanodeID dnId, long scmCmdId, StorageContainerDatanodeProtocolProtos.CommandStatus.Status newState) {
            this.updateStatus(dnId, scmCmdId, newState);
        }

        protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
            for (DatanodeID dnId : this.scmCmdStatusRecord.keySet()) {
                for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
                    this.removeTimeoutScmCommand(dnId, this.getScmCommandIds(dnId, status), timeoutMs);
                }
            }
        }

        void cleanTimeoutSCMCommand(DatanodeID dnId, long timeoutMs) {
            for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
                this.removeTimeoutScmCommand(dnId, this.getScmCommandIds(dnId, status), timeoutMs);
            }
        }

        private Set<Long> getScmCommandIds(DatanodeID dnId, CmdStatus status) {
            HashSet<Long> scmCmdIds = new HashSet<Long>();
            Map<Long, CmdStatusData> record = this.scmCmdStatusRecord.get(dnId);
            if (record == null) {
                return scmCmdIds;
            }
            for (CmdStatusData statusData : record.values()) {
                if (!statusData.getStatus().equals((Object)status)) continue;
                scmCmdIds.add(statusData.getScmCmdId());
            }
            return scmCmdIds;
        }

        private Instant getUpdateTime(DatanodeID dnId, long scmCmdId) {
            Map<Long, CmdStatusData> record = this.scmCmdStatusRecord.get(dnId);
            if (record == null || record.get(scmCmdId) == null) {
                return null;
            }
            return record.get(scmCmdId).getUpdateTime();
        }

        private void updateStatus(DatanodeID dnId, long scmCmdId, StorageContainerDatanodeProtocolProtos.CommandStatus.Status newStatus) {
            Map<Long, CmdStatusData> recordForDn = this.scmCmdStatusRecord.get(dnId);
            if (recordForDn == null) {
                LOG.warn("Unknown Datanode: {} Scm Command ID: {} report status {}", new Object[]{dnId, scmCmdId, newStatus});
                return;
            }
            if (recordForDn.get(scmCmdId) == null) {
                LOG.debug("Unknown SCM Command ID: {} Datanode: {} report status {}", new Object[]{scmCmdId, dnId, newStatus});
                return;
            }
            boolean changed = false;
            CmdStatusData statusData = recordForDn.get(scmCmdId);
            CmdStatus oldStatus = statusData.getStatus();
            switch (newStatus) {
                case PENDING: {
                    if (oldStatus != CmdStatus.TO_BE_SENT && oldStatus != CmdStatus.SENT) break;
                    statusData.setStatus(CmdStatus.SENT);
                    changed = true;
                    break;
                }
                case EXECUTED: 
                case FAILED: {
                    if (oldStatus == CmdStatus.SENT) {
                        this.removeScmCommand(dnId, scmCmdId);
                        changed = true;
                    }
                    if (oldStatus != CmdStatus.TO_BE_SENT) break;
                    LOG.error("Received {} status for a command marked TO_BE_SENT. This indicates a potential issue in command handling. SCM Command ID: {}, Datanode: {}, Current status: {}", new Object[]{newStatus, scmCmdId, dnId, oldStatus});
                    this.removeScmCommand(dnId, scmCmdId);
                    changed = true;
                    break;
                }
                default: {
                    LOG.error("Unexpected status from Datanode: {}. SCM Command ID: {} with status: {}.", new Object[]{dnId, scmCmdId, newStatus});
                }
            }
            if (!changed) {
                LOG.warn("Cannot update illegal status for Datanode: {} SCM Command ID: {} status {} by DN report status {}", new Object[]{dnId, scmCmdId, oldStatus, newStatus});
            } else {
                LOG.debug("Successful update Datanode: {} SCM Command ID: {} status From {} to {}, DN report status {}", new Object[]{dnId, scmCmdId, oldStatus, statusData.getStatus(), newStatus});
            }
        }

        private void removeTimeoutScmCommand(DatanodeID dnId, Set<Long> scmCmdIds, long timeoutMs) {
            Instant now = Instant.now();
            for (Long scmCmdId : scmCmdIds) {
                Instant updateTime = this.getUpdateTime(dnId, scmCmdId);
                if (updateTime == null || Duration.between(updateTime, now).toMillis() <= timeoutMs) continue;
                CmdStatusData state = this.removeScmCommand(dnId, scmCmdId);
                this.metrics.incrDNCommandsTimeout(dnId, 1L);
                LOG.warn("SCM BlockDeletionCommand {} for Datanode: {} was removed after {}ms without update", new Object[]{state, dnId, timeoutMs});
            }
        }

        private CmdStatusData removeScmCommand(DatanodeID dnId, long scmCmdId) {
            Map<Long, CmdStatusData> record = this.scmCmdStatusRecord.get(dnId);
            if (record == null || record.get(scmCmdId) == null) {
                return null;
            }
            CmdStatusData statusData = record.remove(scmCmdId);
            LOG.debug("Remove ScmCommand {} for Datanode: {} ", (Object)statusData, (Object)dnId);
            return statusData;
        }

        Map<DatanodeID, Map<Long, CmdStatus>> getCommandStatusByTxId(Set<DatanodeID> dnIds) {
            HashMap<DatanodeID, Map<Long, CmdStatus>> result = new HashMap<DatanodeID, Map<Long, CmdStatus>>(this.scmCmdStatusRecord.size());
            for (DatanodeID dnId : dnIds) {
                Map<Long, CmdStatusData> record = this.scmCmdStatusRecord.get(dnId);
                if (record == null) continue;
                HashMap<Long, CmdStatus> dnStatusMap = new HashMap<Long, CmdStatus>();
                for (CmdStatusData statusData : record.values()) {
                    CmdStatus status = statusData.getStatus();
                    for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
                        dnStatusMap.put(deletedBlocksTxId, status);
                    }
                }
                result.put(dnId, dnStatusMap);
            }
            return result;
        }

        private void clear() {
            this.scmCmdStatusRecord.clear();
        }

        @VisibleForTesting
        Map<DatanodeID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
            return this.scmCmdStatusRecord;
        }

        public static enum CmdStatus {
            TO_BE_SENT,
            SENT;

        }

        protected static final class CmdStatusData {
            private final DatanodeID dnId;
            private final long scmCmdId;
            private final Set<Long> deletedBlocksTxIds;
            private Instant updateTime;
            private CmdStatus status;

            private CmdStatusData(DatanodeID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
                this.dnId = dnId;
                this.scmCmdId = scmTxID;
                this.deletedBlocksTxIds = deletedBlocksTxIds;
                this.setStatus(DEFAULT_STATUS);
            }

            public Set<Long> getDeletedBlocksTxIds() {
                return Collections.unmodifiableSet(this.deletedBlocksTxIds);
            }

            DatanodeID getDnId() {
                return this.dnId;
            }

            public long getScmCmdId() {
                return this.scmCmdId;
            }

            public CmdStatus getStatus() {
                return this.status;
            }

            public void setStatus(CmdStatus status) {
                this.updateTime = Instant.now();
                this.status = status;
            }

            public Instant getUpdateTime() {
                return this.updateTime;
            }

            public String toString() {
                return "ScmTxStateMachine{dnId=" + this.dnId + ", scmTxID=" + this.scmCmdId + ", deletedBlocksTxIds=" + this.deletedBlocksTxIds + ", updateTime=" + this.updateTime + ", status=" + (Object)((Object)this.status) + '}';
            }
        }
    }
}

