/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.block;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.block.DatanodeDeletedBlockTransactions;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManagerImpl;
import org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager;
import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeletedBlockLogImpl
implements DeletedBlockLog,
EventHandler<CommandStatusReportHandler.DeleteBlockStatus> {
    public static final Logger LOG = LoggerFactory.getLogger(DeletedBlockLogImpl.class);
    private final int maxRetry;
    private final ContainerManager containerManager;
    private final Lock lock;
    private final DeletedBlockLogStateManager deletedBlockLogStateManager;
    private final SCMContext scmContext;
    private final SequenceIdGenerator sequenceIdGen;
    private final ScmBlockDeletingServiceMetrics metrics;
    private final SCMDeletedBlockTransactionStatusManager transactionStatusManager;
    private long scmCommandTimeoutMs = Duration.ofSeconds(300L).toMillis();
    private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
    private long lastProcessedTransactionId = -1L;

    public DeletedBlockLogImpl(ConfigurationSource conf, StorageContainerManager scm, ContainerManager containerManager, DBTransactionBuffer dbTxBuffer, ScmBlockDeletingServiceMetrics metrics) {
        this.maxRetry = conf.getInt("ozone.scm.block.deletion.max.retry", 4096);
        this.containerManager = containerManager;
        this.lock = new ReentrantLock();
        this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl.newBuilder().setConfiguration(conf).setDeletedBlocksTable((Table<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>)scm.getScmMetadataStore().getDeletedBlocksTXTable()).setContainerManager(containerManager).setRatisServer(scm.getScmHAManager().getRatisServer()).setSCMDBTransactionBuffer(dbTxBuffer).build();
        this.scmContext = scm.getScmContext();
        this.sequenceIdGen = scm.getSequenceIdGen();
        this.metrics = metrics;
        this.transactionStatusManager = new SCMDeletedBlockTransactionStatusManager(this.deletedBlockLogStateManager, containerManager, metrics, this.scmCommandTimeoutMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> getFailedTransactions(int count, long startTxId) throws IOException {
        this.lock.lock();
        try {
            ArrayList failedTXs = Lists.newArrayList();
            try (TableIterator<Long, Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.deletedBlockLogStateManager.getReadOnlyIterator();){
                StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction delTX;
                if (count == -1) {
                    while (iter.hasNext()) {
                        delTX = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)((Table.KeyValue)iter.next()).getValue();
                        if (delTX.getCount() != -1) continue;
                        failedTXs.add(delTX);
                    }
                } else {
                    iter.seek((Object)startTxId);
                    while (iter.hasNext() && failedTXs.size() < count) {
                        delTX = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)((Table.KeyValue)iter.next()).getValue();
                        if (delTX.getCount() != -1 || delTX.getTxID() < startTxId) continue;
                        failedTXs.add(delTX);
                    }
                }
            }
            ArrayList arrayList = failedTXs;
            return arrayList;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void incrementCount(List<Long> txIDs) throws IOException {
        this.lock.lock();
        try {
            this.transactionStatusManager.incrementRetryCount(txIDs, this.maxRetry);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public int resetCount(List<Long> txIDs) throws IOException {
        int batchSize = 1000;
        int totalProcessed = 0;
        try {
            List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> batch;
            if (txIDs != null && !txIDs.isEmpty()) {
                return this.resetRetryCount(txIDs);
            }
            long startTxId = 0L;
            while (!(batch = this.getFailedTransactions(1000, startTxId)).isEmpty()) {
                List batchTxIDs = batch.stream().map(StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction::getTxID).collect(Collectors.toList());
                totalProcessed += this.resetRetryCount(new ArrayList<Long>(batchTxIDs));
                startTxId = batch.get(batch.size() - 1).getTxID() + 1L;
                if (!batch.isEmpty()) continue;
                break;
            }
        }
        catch (Exception e) {
            throw new IOException("Error during transaction reset", e);
        }
        return totalProcessed;
    }

    private int resetRetryCount(List<Long> txIDs) throws IOException {
        int totalProcessed;
        this.lock.lock();
        try {
            this.transactionStatusManager.resetRetryCount(txIDs);
            totalProcessed = this.deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(new ArrayList<Long>(txIDs));
        }
        finally {
            this.lock.unlock();
        }
        return totalProcessed;
    }

    private StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction constructNewTransaction(long txID, long containerID, List<Long> blocks) {
        return StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.newBuilder().setTxID(txID).setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumOfValidTransactions() throws IOException {
        this.lock.lock();
        try {
            AtomicInteger num = new AtomicInteger(0);
            try (TableIterator<Long, Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.deletedBlockLogStateManager.getReadOnlyIterator();){
                while (iter.hasNext()) {
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction delTX = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)((Table.KeyValue)iter.next()).getValue();
                    if (delTX.getCount() <= -1) continue;
                    num.incrementAndGet();
                }
            }
            int n = num.get();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void reinitialize(Table<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> deletedTable) {
        this.deletedBlockLogStateManager.reinitialize(deletedTable);
    }

    public void onBecomeLeader() {
        this.transactionStatusManager.clear();
    }

    public void onFlush() {
        this.deletedBlockLogStateManager.onFlush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addTransactions(Map<Long, List<Long>> containerBlocksMap) throws IOException {
        this.lock.lock();
        try {
            ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> txsToBeAdded = new ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>();
            for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
                long nextTXID = this.sequenceIdGen.getNextId("delTxnId");
                StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction tx = this.constructNewTransaction(nextTXID, entry.getKey(), entry.getValue());
                txsToBeAdded.add(tx);
            }
            this.deletedBlockLogStateManager.addTransactionsToDB(txsToBeAdded);
            this.metrics.incrBlockDeletionTransactionCreated(txsToBeAdded.size());
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() throws IOException {
    }

    private void getTransaction(StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions, Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas, Map<UUID, Map<Long, SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus) {
        StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction updatedTxn = StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.newBuilder((StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)tx).setCount(this.transactionStatusManager.getOrDefaultRetryCount(tx.getTxID(), 0)).build();
        for (ContainerReplica replica : replicas) {
            DatanodeDetails details = replica.getDatanodeDetails();
            if (this.transactionStatusManager.isDuplication(details, updatedTxn.getTxID(), commandStatus)) continue;
            transactions.addTransactionToDN(details.getUuid(), updatedTxn);
            this.metrics.incrProcessedTransaction();
        }
    }

    private Boolean checkInadequateReplica(Set<ContainerReplica> replicas, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn, Set<DatanodeDetails> dnList) throws ContainerNotFoundException {
        ContainerInfo containerInfo = this.containerManager.getContainer(ContainerID.valueOf((long)txn.getContainerID()));
        ReplicationManager replicationManager = this.scmContext.getScm().getReplicationManager();
        ContainerHealthResult result = replicationManager.getContainerReplicationHealth(containerInfo, replicas);
        long containerId = txn.getContainerID();
        for (ContainerReplica replica : replicas) {
            DatanodeDetails datanodeDetails = replica.getDatanodeDetails();
            if (dnList.contains(datanodeDetails)) continue;
            DatanodeDetails dnDetail = replica.getDatanodeDetails();
            LOG.debug("Skip Container = {}, because DN = {} is not in dnList.", (Object)containerId, (Object)dnDetail.getUuid());
            return true;
        }
        return result.getHealthState() != ContainerHealthResult.HealthState.HEALTHY;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit, Set<DatanodeDetails> dnList) throws IOException {
        this.lock.lock();
        try {
            this.getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(this.scmCommandTimeoutMs);
            DatanodeDeletedBlockTransactions transactions = new DatanodeDeletedBlockTransactions();
            try (TableIterator<Long, Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.deletedBlockLogStateManager.getReadOnlyIterator();){
                if (this.lastProcessedTransactionId != -1L) {
                    iter.seek((Object)this.lastProcessedTransactionId);
                    if (iter.hasNext()) {
                        this.lastProcessedTransactionId = (Long)((Table.KeyValue)iter.next()).getKey();
                    }
                    if (!iter.hasNext()) {
                        iter.seekToFirst();
                        this.lastProcessedTransactionId = -1L;
                    }
                }
                Map<UUID, Map<Long, SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus>> commandStatus = this.getSCMDeletedBlockTransactionStatusManager().getCommandStatusByTxId(dnList.stream().map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
                ArrayList<Long> txIDs = new ArrayList<Long>();
                this.metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
                Table.KeyValue keyValue = null;
                while (iter.hasNext() && transactions.getBlocksDeleted() < blockDeletionLimit) {
                    keyValue = (Table.KeyValue)iter.next();
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)keyValue.getValue();
                    ContainerID id = ContainerID.valueOf((long)txn.getContainerID());
                    try {
                        if (this.containerManager.getContainer(id).isDeleted()) {
                            LOG.warn("Container: {} was deleted for the transaction: {}.", (Object)id, (Object)txn);
                            txIDs.add(txn.getTxID());
                        } else if (txn.getCount() > -1 && txn.getCount() <= this.maxRetry && !this.containerManager.getContainer(id).isOpen()) {
                            Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(ContainerID.valueOf((long)txn.getContainerID()));
                            if (this.checkInadequateReplica(replicas, txn, dnList).booleanValue()) {
                                this.metrics.incrSkippedTransaction();
                                continue;
                            }
                            this.getTransaction(txn, transactions, dnList, replicas, commandStatus);
                        } else if (txn.getCount() >= this.maxRetry || this.containerManager.getContainer(id).isOpen()) {
                            this.metrics.incrSkippedTransaction();
                        }
                    }
                    catch (ContainerNotFoundException ex) {
                        LOG.warn("Container: {} was not found for the transaction: {}.", (Object)id, (Object)txn);
                        txIDs.add(txn.getTxID());
                    }
                    if (this.lastProcessedTransactionId == (Long)keyValue.getKey()) break;
                    if (iter.hasNext() || this.lastProcessedTransactionId == -1L) continue;
                    iter.seekToFirst();
                }
                long l = this.lastProcessedTransactionId = keyValue != null ? (Long)keyValue.getKey() : -1L;
                if (!txIDs.isEmpty()) {
                    this.deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
                    this.metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
                }
            }
            DatanodeDeletedBlockTransactions datanodeDeletedBlockTransactions = transactions;
            return datanodeDeletedBlockTransactions;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
        this.scmCommandTimeoutMs = scmCommandTimeoutMs;
    }

    @VisibleForTesting
    public SCMDeletedBlockTransactionStatusManager getSCMDeletedBlockTransactionStatusManager() {
        return this.transactionStatusManager;
    }

    @Override
    public void recordTransactionCreated(UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
        this.getSCMDeletedBlockTransactionStatusManager().recordTransactionCreated(dnId, scmCmdId, dnTxSet);
    }

    @Override
    public void onDatanodeDead(UUID dnId) {
        this.getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
    }

    @Override
    public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
        this.getSCMDeletedBlockTransactionStatusManager().onSent(dnId, scmCommand);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(CommandStatusReportHandler.DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
        if (!this.scmContext.isLeader()) {
            LOG.info("Skip commit transactions since current SCM is not leader.");
            return;
        }
        DatanodeDetails details = deleteBlockStatus.getDatanodeDetails();
        UUID dnId = details.getUuid();
        for (StorageContainerDatanodeProtocolProtos.CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) {
            StorageContainerDatanodeProtocolProtos.CommandStatus.Status status = commandStatus.getStatus();
            this.lock.lock();
            try {
                if (status == StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED) {
                    StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto ackProto = commandStatus.getBlockDeletionAck();
                    this.getSCMDeletedBlockTransactionStatusManager().commitTransactions(ackProto.getResultsList(), dnId);
                    this.metrics.incrBlockDeletionCommandSuccess();
                    this.metrics.incrDNCommandsSuccess(dnId, 1L);
                } else if (status == StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED) {
                    this.metrics.incrBlockDeletionCommandFailure();
                    this.metrics.incrDNCommandsFailure(dnId, 1L);
                } else {
                    LOG.debug("Delete Block Command {} is not executed on the Datanode {}.", (Object)commandStatus.getCmdId(), (Object)dnId);
                }
                this.getSCMDeletedBlockTransactionStatusManager().commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

