/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.management.openmbean.CompositeData;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.RepairMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.consistent.CoordinatorSessions;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.consistent.RepairedState;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStats;
import org.apache.cassandra.repair.consistent.admin.RepairStats;
import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser;
import org.apache.cassandra.repair.messages.CleanupMessage;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.state.CoordinatorState;
import org.apache.cassandra.repair.state.ParticipateState;
import org.apache.cassandra.repair.state.ValidationState;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairServiceMBean;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.PaxosRepair;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Simulate(with={Simulate.With.MONITORS})
public class ActiveRepairService
implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
ActiveRepairServiceMBean {
    public final ConsistentSessions consistent;
    private boolean registeredForEndpointChanges = false;
    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
    public static final long UNREPAIRED_SSTABLE = 0L;
    public static final TimeUUID NO_PENDING_REPAIR = null;
    private final ConcurrentMap<TimeUUID, RepairSession> sessions = new ConcurrentHashMap<TimeUUID, RepairSession>();
    private final ConcurrentMap<TimeUUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<TimeUUID, ParentRepairSession>();
    private final Cache<TimeUUID, CoordinatorState> repairs;
    private final Cache<TimeUUID, ParticipateState> participates;
    public final SharedContext ctx;
    private volatile ScheduledFuture<?> irCleanup;
    private final Cache<Integer, Pair<ParentRepairStatus, List<String>>> repairStatusByCmd;
    public final ExecutorPlus snapshotExecutor;

    public static ActiveRepairService instance() {
        return Holder.instance;
    }

    @VisibleForTesting
    static ExecutorPlus initializeExecutor(int maxPoolSize, Config.RepairCommandPoolFullStrategy strategy) {
        return (ExecutorPlus)ExecutorFactory.Global.executorFactory().localAware().withJmxInternal().configurePooled("Repair-Task", maxPoolSize).withKeepAlive(1L, TimeUnit.HOURS).withQueueLimit(strategy == Config.RepairCommandPoolFullStrategy.reject ? 0 : Integer.MAX_VALUE).withRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()).build();
    }

    public static ExecutorPlus repairCommandExecutor() {
        return RepairCommandExecutorHandle.repairCommandExecutor;
    }

    public ActiveRepairService() {
        this(SharedContext.Global.instance);
    }

    @VisibleForTesting
    public ActiveRepairService(SharedContext ctx) {
        this.ctx = ctx;
        this.consistent = new ConsistentSessions(ctx);
        this.snapshotExecutor = (ExecutorPlus)ctx.executorFactory().configurePooled("RepairSnapshotExecutor", 1).withKeepAlive(1L, TimeUnit.HOURS).build();
        this.repairStatusByCmd = CacheBuilder.newBuilder().expireAfterWrite(CassandraRelevantProperties.PARENT_REPAIR_STATUS_EXPIRY_SECONDS.getLong(), TimeUnit.SECONDS).maximumSize(CassandraRelevantProperties.PARENT_REPAIR_STATUS_CACHE_SIZE.getLong()).build();
        DurationSpec.LongNanosecondsBound duration = DatabaseDescriptor.getRepairStateExpires();
        int numElements = DatabaseDescriptor.getRepairStateSize();
        logger.info("Storing repair state for {} or for {} elements", (Object)duration, (Object)numElements);
        this.repairs = CacheBuilder.newBuilder().expireAfterWrite(duration.quantity(), duration.unit()).maximumSize((long)numElements).build();
        this.participates = CacheBuilder.newBuilder().expireAfterWrite(duration.quantity(), duration.unit()).maximumSize((long)numElements).build();
        ctx.mbean().registerMBean((Object)this, "org.apache.cassandra.db:type=RepairService");
    }

    public void start() {
        this.consistent.local.start();
        this.irCleanup = this.ctx.optionalTasks().scheduleAtFixedRate(this.consistent.local::cleanup, 0L, LocalSessions.CLEANUP_INTERVAL, TimeUnit.SECONDS);
    }

    @VisibleForTesting
    public void clearLocalRepairState() {
        this.repairs.asMap().clear();
        this.participates.asMap().clear();
    }

    public void stop() {
        ScheduledFuture<?> irCleanup = this.irCleanup;
        if (irCleanup != null) {
            irCleanup.cancel(false);
        }
        this.consistent.local.stop();
    }

    @Override
    public List<Map<String, String>> getSessions(boolean all, String rangesStr) {
        Set<Range<Token>> ranges = RepairOption.parseRanges(rangesStr, DatabaseDescriptor.getPartitioner());
        return this.consistent.local.sessionInfo(all, ranges);
    }

    @Override
    public void failSession(String session, boolean force) {
        TimeUUID sessionID = TimeUUID.fromString(session);
        this.consistent.local.cancelSession(sessionID, force);
    }

    @Override
    @Deprecated(since="4.1")
    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes) {
        DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMegabytes);
    }

    @Override
    @Deprecated(since="4.1")
    public int getRepairSessionSpaceInMegabytes() {
        return DatabaseDescriptor.getRepairSessionSpaceInMiB();
    }

    @Override
    @Deprecated(since="4.1")
    public void setRepairSessionSpaceInMebibytes(int sizeInMebibytes) {
        DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMebibytes);
    }

    @Override
    @Deprecated(since="4.1")
    public int getRepairSessionSpaceInMebibytes() {
        return DatabaseDescriptor.getRepairSessionSpaceInMiB();
    }

    @Override
    public void setRepairSessionSpaceInMiB(int sizeInMebibytes) {
        try {
            DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMebibytes);
        }
        catch (ConfigurationException e) {
            throw new IllegalArgumentException(e.getMessage());
        }
    }

    @Override
    public int getRepairSessionSpaceInMiB() {
        return DatabaseDescriptor.getRepairSessionSpaceInMiB();
    }

    @Override
    public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString) {
        ArrayList<CompositeData> stats = new ArrayList<CompositeData>();
        Set<Range<Token>> userRanges = rangeString != null ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) : null;
        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) {
            String keyspace = cfs.getKeyspaceName();
            Set<Range<Token>> ranges = userRanges != null ? userRanges : StorageService.instance.getLocalReplicas(keyspace).ranges();
            RepairedState.Stats cfStats = this.consistent.local.getRepairedStats(cfs.metadata().id, ranges);
            stats.add(RepairStats.fromRepairState(keyspace, cfs.name, cfStats).toComposite());
        }
        return stats;
    }

    @Override
    public List<CompositeData> getPendingStats(List<String> schemaArgs, String rangeString) {
        ArrayList<CompositeData> stats = new ArrayList<CompositeData>();
        Set<Range<Token>> userRanges = rangeString != null ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) : null;
        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) {
            String keyspace = cfs.getKeyspaceName();
            Set<Range<Token>> ranges = userRanges != null ? userRanges : StorageService.instance.getLocalReplicas(keyspace).ranges();
            PendingStats cfStats = this.consistent.local.getPendingStats(cfs.metadata().id, ranges);
            stats.add(cfStats.toComposite());
        }
        return stats;
    }

    @Override
    public List<CompositeData> cleanupPending(List<String> schemaArgs, String rangeString, boolean force) {
        ArrayList<CompositeData> stats = new ArrayList<CompositeData>();
        Set<Range<Token>> userRanges = rangeString != null ? RepairOption.parseRanges(rangeString, DatabaseDescriptor.getPartitioner()) : null;
        for (ColumnFamilyStore cfs : SchemaArgsParser.parse(schemaArgs)) {
            String keyspace = cfs.getKeyspaceName();
            Set<Range<Token>> ranges = userRanges != null ? userRanges : StorageService.instance.getLocalReplicas(keyspace).ranges();
            CleanupSummary summary = this.consistent.local.cleanup(cfs.metadata().id, ranges, force);
            stats.add(summary.toComposite());
        }
        return stats;
    }

    @Override
    public int parentRepairSessionsCount() {
        return this.parentRepairSessions.size();
    }

    public RepairSession submitRepairSession(TimeUUID parentRepairSession, CommonRange range, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, ExecutorPlus executor, String ... cfnames) {
        if (repairPaxos && previewKind != PreviewKind.NONE) {
            throw new IllegalArgumentException("cannot repair paxos in a preview repair");
        }
        if (range.endpoints.isEmpty()) {
            return null;
        }
        if (cfnames.length == 0) {
            return null;
        }
        RepairSession session = new RepairSession(this.ctx, parentRepairSession, range, keyspace, parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames);
        ((CoordinatorState)this.repairs.getIfPresent((Object)parentRepairSession)).register(session.state);
        this.sessions.put(session.getId(), session);
        this.registerOnFdAndGossip(session);
        if (session.previewKind == PreviewKind.REPAIRED) {
            LocalSessions.registerListener(session);
        }
        session.addListener(() -> {
            this.sessions.remove(session.getId());
            LocalSessions.unregisterListener(session);
        });
        session.start(executor);
        return session;
    }

    @Override
    public boolean getUseOffheapMerkleTrees() {
        return DatabaseDescriptor.useOffheapMerkleTrees();
    }

    @Override
    public void setUseOffheapMerkleTrees(boolean value) {
        DatabaseDescriptor.useOffheapMerkleTrees(value);
    }

    private <T extends Future & IEndpointStateChangeSubscriber> void registerOnFdAndGossip(final T task) {
        this.ctx.gossiper().register(task);
        this.ctx.failureDetector().registerFailureDetectionEventListener((IFailureDetectionEventListener)task);
        task.addListener(new Runnable(){

            @Override
            public void run() {
                ActiveRepairService.this.ctx.failureDetector().unregisterFailureDetectionEventListener((IFailureDetectionEventListener)((Object)task));
                ActiveRepairService.this.ctx.gossiper().unregister((IEndpointStateChangeSubscriber)((Object)task));
            }
        });
    }

    public synchronized void terminateSessions() {
        IOException cause = new IOException("Terminate session is called");
        for (RepairSession session : this.sessions.values()) {
            session.forceShutdown(cause);
        }
        this.parentRepairSessions.clear();
    }

    public void recordRepairStatus(int cmd, ParentRepairStatus parentRepairStatus, List<String> messages) {
        this.repairStatusByCmd.put((Object)cmd, Pair.create(parentRepairStatus, messages));
    }

    @VisibleForTesting
    public Pair<ParentRepairStatus, List<String>> getRepairStatus(Integer cmd) {
        return (Pair)this.repairStatusByCmd.getIfPresent((Object)cmd);
    }

    public EndpointsForRange getNeighbors(String keyspaceName, Iterable<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) {
        StorageService ss = StorageService.instance;
        EndpointsByRange replicaSets = ss.getRangeToAddressMap(keyspaceName);
        Range<Token> rangeSuperSet = null;
        for (Range<Token> range : keyspaceLocalRanges) {
            if (range.contains((Token)((Object)toRepair))) {
                rangeSuperSet = range;
                break;
            }
            if (!range.intersects(toRepair)) continue;
            throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) but is not fully contained in one; this would lead to imprecise repair. keyspace: %s", toRepair.toString(), range.toString(), keyspaceName));
        }
        if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) {
            return EndpointsForRange.empty(toRepair);
        }
        EndpointsForRange neighbors = (EndpointsForRange)replicaSets.get(rangeSuperSet).filter(r -> !this.ctx.broadcastAddressAndPort().equals(r.endpoint()));
        if (dataCenters != null && !dataCenters.isEmpty()) {
            TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
            Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
            Iterable dcEndpoints = Iterables.concat((Iterable)Iterables.transform(dataCenters, arg_0 -> dcEndpointsMap.get(arg_0)));
            return (EndpointsForRange)neighbors.select(dcEndpoints, true);
        }
        if (hosts != null && !hosts.isEmpty()) {
            HashSet<InetAddressAndPort> specifiedHost = new HashSet<InetAddressAndPort>();
            for (String host : hosts) {
                try {
                    InetAddressAndPort endpoint = InetAddressAndPort.getByName(host.trim());
                    if (!endpoint.equals(this.ctx.broadcastAddressAndPort()) && !neighbors.endpoints().contains(endpoint)) continue;
                    specifiedHost.add(endpoint);
                }
                catch (UnknownHostException e) {
                    throw new IllegalArgumentException("Unknown host specified " + host, e);
                }
            }
            if (!specifiedHost.contains(this.ctx.broadcastAddressAndPort())) {
                throw new IllegalArgumentException("The current host must be part of the repair");
            }
            if (specifiedHost.size() <= 1) {
                String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges with -st/-et options, or specify one of the neighbors that share this range with this node: %s.";
                throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors));
            }
            specifiedHost.remove(this.ctx.broadcastAddressAndPort());
            return (EndpointsForRange)neighbors.keep(specifiedHost);
        }
        return neighbors;
    }

    long getRepairedAt(RepairOption options, boolean force) {
        if (options.isIncremental() && options.isGlobal() && !force) {
            return this.ctx.clock().currentTimeMillis();
        }
        return 0L;
    }

    public boolean verifyCompactionsPendingThreshold(TimeUUID parentRepairSession, PreviewKind previewKind) {
        int pendingThreshold;
        int pendingCompactions = this.ctx.compactionManager().getPendingTasks();
        if (pendingCompactions > (pendingThreshold = this.getRepairPendingCompactionRejectThreshold())) {
            logger.error("[{}] Rejecting incoming repair, pending compactions ({}) above threshold ({})", new Object[]{previewKind.logPrefix(parentRepairSession), pendingCompactions, pendingThreshold});
            return false;
        }
        return true;
    }

    public Future<?> prepareForRepair(TimeUUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore> columnFamilyStores) {
        if (!this.verifyCompactionsPendingThreshold(parentRepairSession, options.getPreviewKind())) {
            this.failRepair(parentRepairSession, "Rejecting incoming repair, pending compactions above threshold");
        }
        long repairedAt = this.getRepairedAt(options, isForcedRepair);
        this.registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
        AtomicInteger pending = new AtomicInteger(endpoints.size());
        Set<String> failedNodes = Collections.synchronizedSet(new HashSet());
        AsyncPromise<Void> promise = new AsyncPromise<Void>();
        ArrayList<TableId> tableIds = new ArrayList<TableId>(columnFamilyStores.size());
        for (ColumnFamilyStore columnFamilyStore : columnFamilyStores) {
            tableIds.add(columnFamilyStore.metadata.id);
        }
        PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
        this.register(new ParticipateState(this.ctx.clock(), this.ctx.broadcastAddressAndPort(), message));
        for (InetAddressAndPort neighbour : endpoints) {
            if (this.ctx.failureDetector().isAlive(neighbour)) {
                this.sendPrepareWithRetries(parentRepairSession, pending, failedNodes, promise, neighbour, message);
                continue;
            }
            if (isForcedRepair && !options.isIncremental()) {
                pending.decrementAndGet();
                continue;
            }
            this.failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
        }
        long l = DatabaseDescriptor.getRepairRetrySpec().isEnabled() ? DatabaseDescriptor.getRepairRpcTimeout(TimeUnit.MILLISECONDS) : DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS);
        this.ctx.optionalTasks().schedule(() -> {
            if (promise.isDone()) {
                return;
            }
            String errorMsg = "Did not get replies from all endpoints.";
            if (promise.tryFailure(new RuntimeException(errorMsg))) {
                this.participateFailed(parentRepairSession, errorMsg);
            }
        }, l, TimeUnit.MILLISECONDS);
        return promise;
    }

    private void sendPrepareWithRetries(final TimeUUID parentRepairSession, final AtomicInteger pending, final Set<String> failedNodes, final AsyncPromise<Void> promise, InetAddressAndPort to, RepairMessage msg) {
        RepairMessage.sendMessageWithRetries(this.ctx, RepairMessage.notDone(promise), msg, Verb.PREPARE_MSG, to, new RequestCallback<Object>(){

            @Override
            public void onResponse(Message<Object> msg) {
                this.ack();
            }

            @Override
            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) {
                failedNodes.add(from.toString());
                if (failureReason == RequestFailureReason.TIMEOUT) {
                    pending.set(-1);
                    promise.setFailure(ActiveRepairService.this.failRepairException(parentRepairSession, "Did not get replies from all endpoints."));
                } else {
                    this.ack();
                }
            }

            private void ack() {
                if (pending.decrementAndGet() == 0) {
                    if (failedNodes.isEmpty()) {
                        promise.setSuccess((Object)null);
                    } else {
                        promise.setFailure(ActiveRepairService.this.failRepairException(parentRepairSession, "Got negative replies from endpoints " + failedNodes));
                    }
                }
            }
        });
    }

    public void cleanUp(final TimeUUID parentRepairSession, Set<InetAddressAndPort> endpoints) {
        for (final InetAddressAndPort endpoint : endpoints) {
            try {
                if (!this.ctx.failureDetector().isAlive(endpoint)) continue;
                CleanupMessage message = new CleanupMessage(parentRepairSession);
                RequestCallback loggingCallback = new RequestCallback(){

                    public void onResponse(Message msg) {
                        logger.trace("Successfully cleaned up {} parent repair session on {}.", (Object)parentRepairSession, (Object)endpoint);
                    }

                    @Override
                    public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) {
                        logger.debug("Failed to clean up parent repair session {} on {}. The uncleaned sessions will be removed on a node restart. This should not be a problem unless you see thousands of messages like this.", (Object)parentRepairSession, (Object)endpoint);
                    }
                };
                RepairMessage.sendMessageWithRetries(this.ctx, message, Verb.CLEANUP_MSG, endpoint, loggingCallback);
            }
            catch (Exception exc) {
                logger.warn("Failed to send a clean up message to {}", (Object)endpoint, (Object)exc);
            }
        }
        ParticipateState state = this.participate(parentRepairSession);
        if (state != null) {
            state.phase.success("Cleanup message recieved");
        }
    }

    private void failRepair(TimeUUID parentRepairSession, String errorMsg) {
        throw this.failRepairException(parentRepairSession, errorMsg);
    }

    private RuntimeException failRepairException(TimeUUID parentRepairSession, String errorMsg) {
        this.participateFailed(parentRepairSession, errorMsg);
        this.removeParentRepairSession(parentRepairSession);
        return new RuntimeException(errorMsg);
    }

    private void participateFailed(TimeUUID parentRepairSession, String errorMsg) {
        ParticipateState state = this.participate(parentRepairSession);
        if (state != null) {
            state.phase.fail(errorMsg);
        }
    }

    public synchronized void registerParentRepairSession(TimeUUID parentRepairSession, InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) {
        assert (isIncremental || repairedAt == 0L);
        if (!this.registeredForEndpointChanges) {
            this.ctx.gossiper().register(this);
            this.ctx.failureDetector().registerFailureDetectionEventListener(this);
            this.registeredForEndpointChanges = true;
        }
        if (!this.parentRepairSessions.containsKey(parentRepairSession)) {
            this.parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal, previewKind));
        }
    }

    public ParentRepairSession getParentRepairSession(TimeUUID parentSessionId) throws NoSuchRepairSessionException {
        ParentRepairSession session = (ParentRepairSession)this.parentRepairSessions.get(parentSessionId);
        if (session == null) {
            throw new NoSuchRepairSessionException(parentSessionId);
        }
        return session;
    }

    public synchronized ParentRepairSession removeParentRepairSession(TimeUUID parentSessionId) {
        ParentRepairSession session = (ParentRepairSession)this.parentRepairSessions.remove(parentSessionId);
        if (session == null) {
            return null;
        }
        String snapshotName = parentSessionId.toString();
        if (session.hasSnapshots.get()) {
            this.snapshotExecutor.submit(() -> {
                logger.info("[repair #{}] Clearing snapshots for {}", (Object)parentSessionId, (Object)session.columnFamilyStores.values().stream().map(cfs -> cfs.metadata().toString()).collect(Collectors.joining(", ")));
                long startNanos = this.ctx.clock().nanoTime();
                for (ColumnFamilyStore cfs2 : session.columnFamilyStores.values()) {
                    if (!cfs2.snapshotExists(snapshotName)) continue;
                    cfs2.clearSnapshot(snapshotName);
                }
                logger.info("[repair #{}] Cleared snapshots in {}ms", (Object)parentSessionId, (Object)TimeUnit.NANOSECONDS.toMillis(this.ctx.clock().nanoTime() - startNanos));
            });
        }
        return session;
    }

    public void handleMessage(Message<? extends RepairMessage> message) {
        RepairMessage payload = (RepairMessage)message.payload;
        RepairJobDesc desc = payload.desc;
        RepairSession session = (RepairSession)this.sessions.get(desc.sessionId);
        if (session == null) {
            switch (message.verb()) {
                case VALIDATION_RSP: 
                case SYNC_RSP: {
                    this.ctx.messaging().send(message.emptyResponse(), message.from());
                }
            }
            if (payload instanceof ValidationResponse) {
                ValidationResponse validation = (ValidationResponse)payload;
                MerkleTrees trees = validation.trees;
                if (trees != null) {
                    trees.release();
                }
            }
            return;
        }
        switch (message.verb()) {
            case VALIDATION_RSP: {
                session.validationComplete(desc, message);
                break;
            }
            case SYNC_RSP: {
                session.syncComplete(desc, message);
                break;
            }
        }
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState state) {
        this.convict(endpoint, Double.MAX_VALUE);
    }

    @Override
    public void convict(InetAddressAndPort ep, double phi) {
        if (phi < 2.0 * DatabaseDescriptor.getPhiConvictThreshold() || this.parentRepairSessions.isEmpty()) {
            return;
        }
        this.abort(prs -> prs.coordinator.equals(ep), "Removing {} in parent repair sessions");
    }

    @Override
    public int getRepairPendingCompactionRejectThreshold() {
        return DatabaseDescriptor.getRepairPendingCompactionRejectThreshold();
    }

    @Override
    public void setRepairPendingCompactionRejectThreshold(int value) {
        DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
    }

    public void abort(Predicate<ParentRepairSession> predicate, String message) {
        HashSet<TimeUUID> parentSessionsToRemove = new HashSet<TimeUUID>();
        for (Map.Entry repairSessionEntry : this.parentRepairSessions.entrySet()) {
            if (!predicate.test((ParentRepairSession)repairSessionEntry.getValue())) continue;
            parentSessionsToRemove.add((TimeUUID)repairSessionEntry.getKey());
        }
        if (!parentSessionsToRemove.isEmpty()) {
            logger.info(message, parentSessionsToRemove);
            parentSessionsToRemove.forEach(this::removeParentRepairSession);
        }
    }

    @VisibleForTesting
    public int parentRepairSessionCount() {
        return this.parentRepairSessions.size();
    }

    @VisibleForTesting
    public int sessionCount() {
        return this.sessions.size();
    }

    public Future<?> repairPaxosForTopologyChange(String ksName, Collection<Range<Token>> ranges, String reason) {
        if (!DatabaseDescriptor.paxosRepairEnabled()) {
            logger.warn("Not running paxos repair for topology change because paxos repair has been disabled");
            return ImmediateFuture.success(null);
        }
        if (ranges.isEmpty()) {
            logger.warn("Not running paxos repair for topology change because there are no ranges to repair");
            return ImmediateFuture.success(null);
        }
        ArrayList tables = Lists.newArrayList((Iterable)Schema.instance.getKeyspaceMetadata((String)ksName).tables);
        ArrayList<PaxosCleanup> futures = new ArrayList<PaxosCleanup>(ranges.size() * tables.size());
        Keyspace keyspace = Keyspace.open(ksName);
        AbstractReplicationStrategy replication = keyspace.getReplicationStrategy();
        for (Range<Token> range : ranges) {
            for (TableMetadata table : tables) {
                Set<InetAddressAndPort> endpoints = ((EndpointsForRange)replication.getNaturalReplicas(range.right).filter(FailureDetector.isReplicaAlive)).endpoints();
                if (!PaxosRepair.hasSufficientLiveNodesForTopologyChange(keyspace, range, endpoints)) {
                    Set<InetAddressAndPort> downEndpoints = ((EndpointsForRange)replication.getNaturalReplicas(range.right).filter(e -> !endpoints.contains(e))).endpoints();
                    downEndpoints.removeAll(endpoints);
                    throw new RuntimeException(String.format("Insufficient live nodes to repair paxos for %s in %s for %s.\nThere must be enough live nodes to satisfy EACH_QUORUM, but the following nodes are down: %s\nThis check can be skipped by setting either the yaml property skip_paxos_repair_on_topology_change or the system property %s to false. The jmx property StorageService.SkipPaxosRepairOnTopologyChange can also be set to false to temporarily disable without restarting the node\nIndividual keyspaces can be skipped with the yaml property skip_paxos_repair_on_topology_change_keyspaces, thesystem property %s, or temporarily with the jmxproperty StorageService.SkipPaxosRepairOnTopologyChangeKeyspaces\nSkipping this check can lead to paxos correctness issues", range, ksName, reason, downEndpoints, CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE.getKey(), CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES.getKey()));
                }
                EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken((Token)range.right, ksName);
                if (pending.size() > 1 && !CassandraRelevantProperties.PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getBoolean()) {
                    throw new RuntimeException(String.format("Cannot begin paxos auto repair for %s in %s.%s, multiple pending endpoints exist for range (%s). Set -D%s=true to skip this check", range, table.keyspace, table.name, pending, CassandraRelevantProperties.PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));
                }
                PaxosCleanup future = PaxosCleanup.cleanup(endpoints, table, Collections.singleton(range), false, ActiveRepairService.repairCommandExecutor());
                futures.add(future);
            }
        }
        return FutureCombiner.allOf(futures);
    }

    @Override
    public int getPaxosRepairParallelism() {
        return DatabaseDescriptor.getPaxosRepairParallelism();
    }

    @Override
    public void setPaxosRepairParallelism(int v) {
        DatabaseDescriptor.setPaxosRepairParallelism(v);
    }

    public void shutdownNowAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        ExecutorUtils.shutdownNowAndWait(timeout, unit, this.snapshotExecutor);
    }

    public Collection<CoordinatorState> coordinators() {
        return this.repairs.asMap().values();
    }

    public CoordinatorState coordinator(TimeUUID id) {
        return (CoordinatorState)this.repairs.getIfPresent((Object)id);
    }

    public void register(CoordinatorState state) {
        this.repairs.put((Object)((TimeUUID)state.id), (Object)state);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean register(ParticipateState state) {
        Cache<TimeUUID, ParticipateState> cache = this.participates;
        synchronized (cache) {
            ParticipateState current = (ParticipateState)this.participates.getIfPresent(state.id);
            if (current != null) {
                return false;
            }
            this.participates.put((Object)((TimeUUID)state.id), (Object)state);
        }
        return true;
    }

    public Collection<ParticipateState> participates() {
        return this.participates.asMap().values();
    }

    public ParticipateState participate(TimeUUID id) {
        return (ParticipateState)this.participates.getIfPresent((Object)id);
    }

    public Collection<ValidationState> validations() {
        return this.participates.asMap().values().stream().flatMap(p -> p.validations().stream()).collect(Collectors.toList());
    }

    public ValidationState validation(UUID id) {
        for (ValidationState state : this.validations()) {
            if (!((UUID)state.id).equals(id)) continue;
            return state;
        }
        return null;
    }

    static {
        RepairMetrics.init();
    }

    public static class ParentRepairSession {
        private final Keyspace keyspace;
        private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<TableId, ColumnFamilyStore>();
        private final Collection<Range<Token>> ranges;
        public final boolean isIncremental;
        public final boolean isGlobal;
        public final long repairedAt;
        public final InetAddressAndPort coordinator;
        public final PreviewKind previewKind;
        public final AtomicBoolean hasSnapshots = new AtomicBoolean(false);

        public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) {
            this.coordinator = coordinator;
            HashSet<Keyspace> keyspaces = new HashSet<Keyspace>();
            for (ColumnFamilyStore cfs : columnFamilyStores) {
                keyspaces.add(cfs.keyspace);
                this.columnFamilyStores.put(cfs.metadata.id, cfs);
            }
            Preconditions.checkArgument((keyspaces.size() == 1 ? 1 : 0) != 0, (Object)"repair sessions cannot operate on multiple keyspaces");
            this.keyspace = (Keyspace)Iterables.getOnlyElement(keyspaces);
            this.ranges = ranges;
            this.repairedAt = repairedAt;
            this.isIncremental = isIncremental;
            this.isGlobal = isGlobal;
            this.previewKind = previewKind;
        }

        public boolean isPreview() {
            return this.previewKind != PreviewKind.NONE;
        }

        public Collection<ColumnFamilyStore> getColumnFamilyStores() {
            return ImmutableSet.builder().addAll(this.columnFamilyStores.values()).build();
        }

        public Keyspace getKeyspace() {
            return this.keyspace;
        }

        public Set<TableId> getTableIds() {
            return ImmutableSet.copyOf((Iterable)Iterables.transform(this.getColumnFamilyStores(), cfs -> cfs.metadata.id));
        }

        public Set<Range<Token>> getRanges() {
            return ImmutableSet.copyOf(this.ranges);
        }

        public String toString() {
            return "ParentRepairSession{columnFamilyStores=" + this.columnFamilyStores + ", ranges=" + this.ranges + ", repairedAt=" + this.repairedAt + "}";
        }

        public boolean setHasSnapshots() {
            return this.hasSnapshots.compareAndSet(false, true);
        }
    }

    public static class RepairCommandExecutorHandle {
        private static final ExecutorPlus repairCommandExecutor = ActiveRepairService.initializeExecutor(DatabaseDescriptor.getRepairCommandPoolSize(), DatabaseDescriptor.getRepairCommandPoolFullStrategy());
    }

    private static class Holder {
        private static final ActiveRepairService instance = new ActiveRepairService();

        private Holder() {
        }
    }

    public static class ConsistentSessions {
        public final LocalSessions local;
        public final CoordinatorSessions coordinated;

        public ConsistentSessions(SharedContext ctx) {
            this.local = new LocalSessions(ctx);
            this.coordinated = new CoordinatorSessions(ctx);
        }
    }

    public static enum ParentRepairStatus {
        IN_PROGRESS,
        COMPLETED,
        FAILED;

    }
}

