/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.block;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.block.DatanodeDeletedBlockTransactions;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManagerImpl;
import org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager;
import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeletedBlockLogImpl
implements DeletedBlockLog,
EventHandler<CommandStatusReportHandler.DeleteBlockStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(DeletedBlockLogImpl.class);
    private final ContainerManager containerManager;
    private final Lock lock;
    private DeletedBlockLogStateManager deletedBlockLogStateManager;
    private final SCMContext scmContext;
    private final SequenceIdGenerator sequenceIdGen;
    private final ScmBlockDeletingServiceMetrics metrics;
    private final SCMDeletedBlockTransactionStatusManager transactionStatusManager;
    private long scmCommandTimeoutMs = Duration.ofSeconds(300L).toMillis();
    private long lastProcessedTransactionId = -1L;
    private final int logAppenderQueueByteLimit;
    private int deletionFactorPerDatanode;

    public DeletedBlockLogImpl(ConfigurationSource conf, StorageContainerManager scm, ContainerManager containerManager, DBTransactionBuffer dbTxBuffer, ScmBlockDeletingServiceMetrics metrics) {
        this.containerManager = containerManager;
        this.lock = new ReentrantLock();
        this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl.newBuilder().setConfiguration(conf).setDeletedBlocksTable((Table<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>)scm.getScmMetadataStore().getDeletedBlocksTXTable()).setContainerManager(containerManager).setRatisServer(scm.getScmHAManager().getRatisServer()).setSCMDBTransactionBuffer(dbTxBuffer).build();
        this.scmContext = scm.getScmContext();
        this.sequenceIdGen = scm.getSequenceIdGen();
        this.metrics = metrics;
        this.transactionStatusManager = new SCMDeletedBlockTransactionStatusManager(this.deletedBlockLogStateManager, containerManager, metrics, this.scmCommandTimeoutMs);
        int limit = (int)conf.getStorageSize("ozone.scm.ha.ratis.log.appender.queue.byte-limit", "32MB", StorageUnit.BYTES);
        this.logAppenderQueueByteLimit = (int)((double)limit * 0.9);
        int deletionFactor = conf.getInt("ozone.scm.block.deletion.per.dn.distribution.factor", 8);
        this.deletionFactorPerDatanode = deletionFactor <= 0 ? 1 : deletionFactor;
    }

    @VisibleForTesting
    void setDeletedBlockLogStateManager(DeletedBlockLogStateManager manager) {
        this.deletedBlockLogStateManager = manager;
    }

    @VisibleForTesting
    void setDeleteBlocksFactorPerDatanode(int deleteBlocksFactorPerDatanode) {
        this.deletionFactorPerDatanode = deleteBlocksFactorPerDatanode;
    }

    @Override
    public void incrementCount(List<Long> txIDs) throws IOException {
        this.transactionStatusManager.incrementRetryCount(txIDs);
    }

    private StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction constructNewTransaction(long txID, long containerID, List<Long> blocks) {
        return StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.newBuilder().setTxID(txID).setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumOfValidTransactions() throws IOException {
        this.lock.lock();
        try {
            int count = 0;
            try (Table.KeyValueIterator<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> iter = this.deletedBlockLogStateManager.getReadOnlyIterator();){
                while (iter.hasNext()) {
                    iter.next();
                    ++count;
                }
            }
            int n = count;
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void reinitialize(Table<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> deletedTable) {
        this.deletedBlockLogStateManager.reinitialize(deletedTable);
    }

    public void onBecomeLeader() {
        this.transactionStatusManager.clear();
    }

    public void onFlush() {
        this.deletedBlockLogStateManager.onFlush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addTransactions(Map<Long, List<Long>> containerBlocksMap) throws IOException {
        this.lock.lock();
        try {
            ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> txsToBeAdded = new ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>();
            long currentBatchSizeBytes = 0L;
            for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
                long nextTXID = this.sequenceIdGen.getNextId("delTxnId");
                StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction tx = this.constructNewTransaction(nextTXID, entry.getKey(), entry.getValue());
                txsToBeAdded.add(tx);
                long txSize = tx.getSerializedSize();
                if ((currentBatchSizeBytes += txSize) < (long)this.logAppenderQueueByteLimit) continue;
                this.deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
                this.metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
                txsToBeAdded.clear();
                currentBatchSizeBytes = 0L;
            }
            if (!txsToBeAdded.isEmpty()) {
                this.deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
                this.metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() {
    }

    private void getTransaction(StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions, Set<ContainerReplica> replicas, Map<DatanodeID, Map<Long, SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus, int maxDeleteBlocksPerDatanode) {
        if (!replicas.stream().allMatch(replica -> {
            DatanodeID datanodeID = replica.getDatanodeDetails().getID();
            return transactions.getNumberOfBlocksForDatanode(datanodeID) < maxDeleteBlocksPerDatanode;
        })) {
            return;
        }
        boolean flag = false;
        for (ContainerReplica replica2 : replicas) {
            DatanodeID datanodeID = replica2.getDatanodeDetails().getID();
            if (this.transactionStatusManager.isDuplication(datanodeID, tx.getTxID(), commandStatus)) continue;
            transactions.addTransactionToDN(datanodeID, tx);
            flag = true;
        }
        if (flag) {
            this.metrics.incrProcessedTransaction();
        }
    }

    private Boolean checkInadequateReplica(Set<ContainerReplica> replicas, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn, Set<DatanodeDetails> dnList) throws ContainerNotFoundException {
        ContainerInfo containerInfo = this.containerManager.getContainer(ContainerID.valueOf((long)txn.getContainerID()));
        ReplicationManager replicationManager = this.scmContext.getScm().getReplicationManager();
        ContainerHealthResult result = replicationManager.getContainerReplicationHealth(containerInfo, replicas);
        long containerId = txn.getContainerID();
        for (ContainerReplica replica : replicas) {
            DatanodeDetails datanodeDetails = replica.getDatanodeDetails();
            if (dnList.contains(datanodeDetails)) continue;
            DatanodeDetails dnDetail = replica.getDatanodeDetails();
            LOG.debug("Skip Container = {}, because DN = {} is not in dnList.", (Object)containerId, (Object)dnDetail);
            return true;
        }
        return result.getHealthState() != ContainerHealthResult.HealthState.HEALTHY;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit, Set<DatanodeDetails> dnList) throws IOException {
        this.lock.lock();
        try {
            this.getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(this.scmCommandTimeoutMs);
            DatanodeDeletedBlockTransactions transactions = new DatanodeDeletedBlockTransactions();
            try (Table.KeyValueIterator<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> iter = this.deletedBlockLogStateManager.getReadOnlyIterator();){
                int maxDeleteBlocksPerDatanode;
                if (this.lastProcessedTransactionId != -1L) {
                    iter.seek((Object)this.lastProcessedTransactionId);
                    if (iter.hasNext()) {
                        this.lastProcessedTransactionId = (Long)((Table.KeyValue)iter.next()).getKey();
                    }
                    if (!iter.hasNext()) {
                        iter.seekToFirst();
                        this.lastProcessedTransactionId = -1L;
                    }
                }
                Map<DatanodeID, Map<Long, SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus = this.getSCMDeletedBlockTransactionStatusManager().getCommandStatusByTxId(dnList.stream().map(DatanodeDetails::getID).collect(Collectors.toSet()));
                ArrayList<Long> txIDs = new ArrayList<Long>();
                this.metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
                Table.KeyValue keyValue = null;
                int factor = dnList.size() / this.deletionFactorPerDatanode;
                int n = maxDeleteBlocksPerDatanode = factor > 0 ? Math.min(blockDeletionLimit, blockDeletionLimit / factor) : blockDeletionLimit;
                while (iter.hasNext() && transactions.getBlocksDeleted() < blockDeletionLimit) {
                    keyValue = (Table.KeyValue)iter.next();
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)keyValue.getValue();
                    ContainerID id = ContainerID.valueOf((long)txn.getContainerID());
                    ContainerInfo container = this.containerManager.getContainer(id);
                    try {
                        if (container.isDeleted()) {
                            LOG.warn("Container: {} was deleted for the transaction: {}.", (Object)id, (Object)txn);
                            txIDs.add(txn.getTxID());
                        } else if (!container.isOpen()) {
                            Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(ContainerID.valueOf((long)txn.getContainerID()));
                            if (!this.checkInadequateReplica(replicas, txn, dnList).booleanValue()) {
                                this.getTransaction(txn, transactions, replicas, commandStatus, maxDeleteBlocksPerDatanode);
                            } else {
                                this.metrics.incrSkippedTransaction();
                            }
                        } else if (this.containerManager.getContainer(id).isOpen()) {
                            this.metrics.incrSkippedTransaction();
                        }
                    }
                    catch (ContainerNotFoundException ex) {
                        LOG.warn("Container: {} was not found for the transaction: {}.", (Object)id, (Object)txn);
                        txIDs.add(txn.getTxID());
                    }
                    if (this.lastProcessedTransactionId == (Long)keyValue.getKey()) break;
                    if (iter.hasNext() || this.lastProcessedTransactionId == -1L) continue;
                    iter.seekToFirst();
                }
                long l = this.lastProcessedTransactionId = keyValue != null ? (Long)keyValue.getKey() : -1L;
                if (!txIDs.isEmpty()) {
                    this.deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
                    this.metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
                }
            }
            DatanodeDeletedBlockTransactions datanodeDeletedBlockTransactions = transactions;
            return datanodeDeletedBlockTransactions;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
        this.scmCommandTimeoutMs = scmCommandTimeoutMs;
    }

    @VisibleForTesting
    public SCMDeletedBlockTransactionStatusManager getSCMDeletedBlockTransactionStatusManager() {
        return this.transactionStatusManager;
    }

    @Override
    public void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet) {
        this.getSCMDeletedBlockTransactionStatusManager().recordTransactionCreated(dnId, scmCmdId, dnTxSet);
    }

    @Override
    public int getTransactionToDNsCommitMapSize() {
        return this.getSCMDeletedBlockTransactionStatusManager().getTransactionToDNsCommitMapSize();
    }

    @Override
    public void onDatanodeDead(DatanodeID dnId) {
        this.getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
    }

    @Override
    public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
        this.getSCMDeletedBlockTransactionStatusManager().onSent(dnId, scmCommand);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(CommandStatusReportHandler.DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
        if (!this.scmContext.isLeader()) {
            LOG.info("Skip commit transactions since current SCM is not leader.");
            return;
        }
        DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
        DatanodeID dnId = details.getID();
        for (StorageContainerDatanodeProtocolProtos.CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
            StorageContainerDatanodeProtocolProtos.CommandStatus.Status status = commandStatus.getStatus();
            this.lock.lock();
            try {
                if (status == StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED) {
                    StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto ackProto = commandStatus.getBlockDeletionAck();
                    this.getSCMDeletedBlockTransactionStatusManager().commitTransactions(ackProto.getResultsList(), dnId);
                    this.metrics.incrBlockDeletionCommandSuccess();
                    this.metrics.incrDNCommandsSuccess(dnId, 1L);
                } else if (status == StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED) {
                    this.metrics.incrBlockDeletionCommandFailure();
                    this.metrics.incrDNCommandsFailure(dnId, 1L);
                } else {
                    LOG.debug("Delete Block Command {} is not executed on the Datanode {}.", (Object)commandStatus.getCmdId(), (Object)dnId);
                }
                this.getSCMDeletedBlockTransactionStatusManager().commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

