/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.FutureTask;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingAntiCompaction {
    private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class);
    private static final int ACQUIRE_SLEEP_MS = CassandraRelevantProperties.ACQUIRE_SLEEP_MS.getInt();
    private static final int ACQUIRE_RETRY_SECONDS = CassandraRelevantProperties.ACQUIRE_RETRY_SECONDS.getInt();
    private final TimeUUID prsId;
    private final Collection<ColumnFamilyStore> tables;
    private final RangesAtEndpoint tokenRanges;
    private final ExecutorService executor;
    private final int acquireRetrySeconds;
    private final int acquireSleepMillis;
    private final BooleanSupplier isCancelled;

    public PendingAntiCompaction(TimeUUID prsId, Collection<ColumnFamilyStore> tables, RangesAtEndpoint tokenRanges, ExecutorService executor, BooleanSupplier isCancelled) {
        this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, ACQUIRE_SLEEP_MS, executor, isCancelled);
    }

    @VisibleForTesting
    PendingAntiCompaction(TimeUUID prsId, Collection<ColumnFamilyStore> tables, RangesAtEndpoint tokenRanges, int acquireRetrySeconds, int acquireSleepMillis, ExecutorService executor, BooleanSupplier isCancelled) {
        this.prsId = prsId;
        this.tables = tables;
        this.tokenRanges = tokenRanges;
        this.executor = executor;
        this.acquireRetrySeconds = acquireRetrySeconds;
        this.acquireSleepMillis = acquireSleepMillis;
        this.isCancelled = isCancelled;
    }

    public Future<List<Void>> run() {
        ArrayList<FutureTask<AcquireResult>> tasks = new ArrayList<FutureTask<AcquireResult>>(this.tables.size());
        for (ColumnFamilyStore cfs : this.tables) {
            cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.ANTICOMPACTION);
            FutureTask<AcquireResult> task = new FutureTask<AcquireResult>(this.getAcquisitionCallable(cfs, this.tokenRanges.ranges(), this.prsId, this.acquireRetrySeconds, this.acquireSleepMillis));
            this.executor.submit(task);
            tasks.add(task);
        }
        Future<List<List<AcquireResult>>> acquisitionResults = FutureCombiner.successfulOf(tasks);
        return acquisitionResults.flatMap(this.getAcquisitionCallback(this.prsId, this.tokenRanges));
    }

    @VisibleForTesting
    protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, TimeUUID prsId, int acquireRetrySeconds, int acquireSleepMillis) {
        return new AcquisitionCallable(cfs, ranges, prsId, acquireRetrySeconds, acquireSleepMillis);
    }

    @VisibleForTesting
    protected AcquisitionCallback getAcquisitionCallback(TimeUUID prsId, RangesAtEndpoint tokenRanges) {
        return new AcquisitionCallback(prsId, tokenRanges, this.isCancelled);
    }

    static class AcquisitionCallback
    implements Function<List<AcquireResult>, Future<List<Void>>> {
        private final TimeUUID parentRepairSession;
        private final RangesAtEndpoint tokenRanges;
        private final BooleanSupplier isCancelled;

        public AcquisitionCallback(TimeUUID parentRepairSession, RangesAtEndpoint tokenRanges, BooleanSupplier isCancelled) {
            this.parentRepairSession = parentRepairSession;
            this.tokenRanges = tokenRanges;
            this.isCancelled = isCancelled;
        }

        Future<Void> submitPendingAntiCompaction(AcquireResult result) {
            return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, this.tokenRanges, result.refs, result.txn, this.parentRepairSession, this.isCancelled);
        }

        private static boolean shouldAbort(AcquireResult result) {
            if (result == null) {
                return true;
            }
            return result.refs != null && Iterables.any(result.refs, sstable -> {
                StatsMetadata metadata = sstable.getSSTableMetadata();
                return metadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR || metadata.repairedAt != 0L;
            });
        }

        @Override
        public Future<List<Void>> apply(List<AcquireResult> results) {
            if (Iterables.any(results, AcquisitionCallback::shouldAbort)) {
                for (AcquireResult result : results) {
                    if (result == null) continue;
                    logger.info("Releasing acquired sstables for {}.{}", (Object)result.cfs.metadata.keyspace, (Object)result.cfs.metadata.name);
                    result.abort();
                }
                String message = String.format("Prepare phase for incremental repair session %s was unable to acquire exclusive access to the neccesary sstables. This is usually caused by running multiple incremental repairs on nodes that share token ranges", this.parentRepairSession);
                logger.warn(message);
                return ImmediateFuture.failure(new SSTableAcquisitionException(message));
            }
            ArrayList<Future<Void>> pendingAntiCompactions = new ArrayList<Future<Void>>(results.size());
            for (AcquireResult result : results) {
                if (result.txn == null) continue;
                Future<Void> future = this.submitPendingAntiCompaction(result);
                pendingAntiCompactions.add(future);
            }
            return FutureCombiner.allOf(pendingAntiCompactions);
        }
    }

    public static class AcquisitionCallable
    implements Callable<AcquireResult> {
        private final ColumnFamilyStore cfs;
        private final TimeUUID sessionID;
        private final AntiCompactionPredicate predicate;
        private final int acquireRetrySeconds;
        private final int acquireSleepMillis;

        @VisibleForTesting
        public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, TimeUUID sessionID, int acquireRetrySeconds, int acquireSleepMillis) {
            this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
        }

        @VisibleForTesting
        AcquisitionCallable(ColumnFamilyStore cfs, TimeUUID sessionID, int acquireRetrySeconds, int acquireSleepMillis, AntiCompactionPredicate predicate) {
            this.cfs = cfs;
            this.sessionID = sessionID;
            this.predicate = predicate;
            this.acquireRetrySeconds = acquireRetrySeconds;
            this.acquireSleepMillis = acquireSleepMillis;
        }

        private AcquireResult acquireTuple() {
            try {
                Set sstables = this.cfs.getLiveSSTables().stream().filter(this.predicate).collect(Collectors.toSet());
                if (sstables.isEmpty()) {
                    return new AcquireResult(this.cfs, null, null);
                }
                LifecycleTransaction txn = this.cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                if (txn != null) {
                    return new AcquireResult(this.cfs, Refs.ref(sstables), txn);
                }
                logger.error("Could not mark compacting for {} (sstables = {}, compacting = {})", new Object[]{this.sessionID, sstables, this.cfs.getTracker().getCompacting()});
            }
            catch (SSTableAcquisitionException e) {
                logger.warn(e.getMessage());
                logger.debug("Got exception trying to acquire sstables", (Throwable)e);
            }
            return null;
        }

        protected AcquireResult acquireSSTables() {
            return this.cfs.runWithCompactionsDisabled(this::acquireTuple, this.predicate, OperationType.ANTICOMPACTION, false, false, false);
        }

        @Override
        public AcquireResult call() {
            logger.debug("acquiring sstables for pending anti compaction on session {}", (Object)this.sessionID);
            long start = Clock.Global.currentTimeMillis();
            long delay = TimeUnit.SECONDS.toMillis(this.acquireRetrySeconds);
            do {
                try {
                    return this.acquireSSTables();
                }
                catch (SSTableAcquisitionException e) {
                    logger.warn("Session {} failed acquiring sstables: {}, retrying every {}ms for another {}s", new Object[]{this.sessionID, e.getMessage(), this.acquireSleepMillis, TimeUnit.SECONDS.convert(delay + start - Clock.Global.currentTimeMillis(), TimeUnit.MILLISECONDS)});
                    Uninterruptibles.sleepUninterruptibly((long)this.acquireSleepMillis, (TimeUnit)TimeUnit.MILLISECONDS);
                    if (Clock.Global.currentTimeMillis() - start <= delay) continue;
                    logger.warn("{} Timed out waiting to acquire sstables", (Object)this.sessionID, (Object)e);
                }
                catch (Throwable t) {
                    logger.error("Got exception disabling compactions for session {}", (Object)this.sessionID, (Object)t);
                    throw t;
                }
            } while (Clock.Global.currentTimeMillis() - start < delay);
            return null;
        }
    }

    @VisibleForTesting
    static class AntiCompactionPredicate
    implements Predicate<SSTableReader> {
        private final Collection<Range<Token>> ranges;
        private final TimeUUID prsid;

        public AntiCompactionPredicate(Collection<Range<Token>> ranges, TimeUUID prsid) {
            this.ranges = ranges;
            this.prsid = prsid;
        }

        public boolean apply(SSTableReader sstable) {
            if (!sstable.intersects(this.ranges)) {
                return false;
            }
            StatsMetadata metadata = sstable.getSSTableMetadata();
            if (metadata.repairedAt != 0L) {
                return false;
            }
            if (!sstable.descriptor.version.hasPendingRepair()) {
                String message = String.format("Prepare phase failed because it encountered legacy sstables that don't support pending repair, run upgradesstables before starting incremental repairs, repair session (%s)", this.prsid);
                throw new SSTableAcquisitionException(message);
            }
            if (metadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR) {
                if (!ActiveRepairService.instance().consistent.local.isSessionFinalized(metadata.pendingRepair)) {
                    String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered intersecting sstables belonging to another incremental repair session (%s). This is caused by starting an incremental repair session before a previous one has completed. Check nodetool repair_admin for hung sessions and fix them.", this.prsid, metadata.pendingRepair);
                    throw new SSTableAcquisitionException(message);
                }
                return false;
            }
            Collection<CompactionInfo> cis = CompactionManager.instance.active.getCompactionsForSSTable(sstable, OperationType.ANTICOMPACTION);
            if (cis != null && !cis.isEmpty()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Prepare phase for incremental repair session ");
                sb.append(this.prsid);
                sb.append(" has failed because it encountered intersecting sstables belonging to another incremental repair session. ");
                sb.append("This is caused by starting multiple conflicting incremental repairs at the same time. ");
                sb.append("Conflicting anticompactions: ");
                for (CompactionInfo ci : cis) {
                    sb.append(ci.getTaskId() == null ? "no compaction id" : ci.getTaskId()).append(':').append(ci.getSSTables()).append(',');
                }
                throw new SSTableAcquisitionException(sb.toString());
            }
            return true;
        }
    }

    static class SSTableAcquisitionException
    extends RuntimeException {
        SSTableAcquisitionException(String message) {
            super(message);
        }
    }

    public static class AcquireResult {
        final ColumnFamilyStore cfs;
        final Refs<SSTableReader> refs;
        final LifecycleTransaction txn;

        AcquireResult(ColumnFamilyStore cfs, Refs<SSTableReader> refs, LifecycleTransaction txn) {
            this.cfs = cfs;
            this.refs = refs;
            this.txn = txn;
        }

        @VisibleForTesting
        public void abort() {
            if (this.txn != null) {
                this.txn.abort();
            }
            if (this.refs != null) {
                this.refs.release();
            }
        }
    }
}

