/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.repair.consistent.CoordinatorSession;
import org.apache.cassandra.repair.consistent.SyncStatSummary;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.ProgressListener;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairRunnable
extends WrappedRunnable
implements ProgressEventNotifier {
    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
    private final StorageService storageService;
    private final int cmd;
    private final RepairOption options;
    private final String keyspace;
    private final String tag;
    private final AtomicInteger progress = new AtomicInteger();
    private final int totalProgress;
    private final List<ProgressListener> listeners = new ArrayList<ProgressListener>();
    private static final AtomicInteger threadCounter = new AtomicInteger(1);

    public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) {
        this.storageService = storageService;
        this.cmd = cmd;
        this.options = options;
        this.keyspace = keyspace;
        this.tag = "repair:" + cmd;
        this.totalProgress = 4 + options.getRanges().size();
    }

    @Override
    public void addProgressListener(ProgressListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeProgressListener(ProgressListener listener) {
        this.listeners.remove(listener);
    }

    protected void fireProgressEvent(ProgressEvent event) {
        for (ProgressListener listener : this.listeners) {
            listener.progress(this.tag, event);
        }
    }

    protected void fireErrorAndComplete(int progressCount, int totalProgress, String message) {
        StorageMetrics.repairExceptions.inc();
        String errorMessage = String.format("Repair command #%d failed with error %s", this.cmd, message);
        this.fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, errorMessage));
        String completionMessage = String.format("Repair command #%d finished with error", this.cmd);
        this.fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, completionMessage));
        this.recordFailure(errorMessage, completionMessage);
    }

    @Override
    protected void runMayThrow() throws Exception {
        boolean force;
        TraceState traceState;
        Iterable<ColumnFamilyStore> validColumnFamilies;
        ActiveRepairService.instance.recordRepairStatus(this.cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, (List<String>)ImmutableList.of());
        UUID parentSession = UUIDGen.getTimeUUID();
        String tag = "repair:" + this.cmd;
        AtomicInteger progress = new AtomicInteger();
        int totalProgress = 4 + this.options.getRanges().size();
        String[] columnFamilies = this.options.getColumnFamilies().toArray(new String[this.options.getColumnFamilies().size()]);
        try {
            validColumnFamilies = this.storageService.getValidColumnFamilies(false, false, this.keyspace, columnFamilies);
            progress.incrementAndGet();
        }
        catch (IOException | IllegalArgumentException e) {
            logger.error("Repair {} failed:", (Object)parentSession, (Object)e);
            this.fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
            return;
        }
        if (Iterables.isEmpty(validColumnFamilies)) {
            String message = String.format("Empty keyspace, skipping repair: %s", this.keyspace);
            logger.info(message);
            this.fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, 0, 0, message));
            return;
        }
        long startTime = System.currentTimeMillis();
        String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", this.cmd, parentSession, this.keyspace, this.options);
        logger.info(message);
        if (this.options.isTraced()) {
            StringBuilder cfsb = new StringBuilder();
            for (ColumnFamilyStore columnFamilyStore : validColumnFamilies) {
                cfsb.append(", ").append(columnFamilyStore.keyspace.getName()).append(".").append(columnFamilyStore.name);
            }
            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
            traceState = Tracing.instance.begin("repair", (Map<String, String>)ImmutableMap.of((Object)"keyspace", (Object)this.keyspace, (Object)"columnFamilies", (Object)cfsb.substring(2)));
            message = message + " tracing with " + sessionId;
            this.fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
            Tracing.traceRepair(message, new Object[0]);
            traceState.enableActivityNotification(tag);
            for (ProgressListener progressListener : this.listeners) {
                traceState.addProgressListener(progressListener);
            }
            Thread thread = this.createQueryThread(this.cmd, sessionId);
            thread.setName("RepairTracePolling");
            thread.start();
        } else {
            this.fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
            traceState = null;
        }
        HashSet allNeighbors = new HashSet();
        ArrayList<CommonRange> commonRanges = new ArrayList<CommonRange>();
        try {
            Set<Range<Token>> set = this.storageService.getLocalReplicas(this.keyspace).ranges();
            for (Range<Token> range : this.options.getRanges()) {
                EndpointsForRange neighbors = ActiveRepairService.getNeighbors(this.keyspace, set, range, this.options.getDataCenters(), this.options.getHosts());
                RepairRunnable.addRangeToNeighbors(commonRanges, range, neighbors);
                allNeighbors.addAll(neighbors.endpoints());
            }
            progress.incrementAndGet();
        }
        catch (IllegalArgumentException illegalArgumentException) {
            logger.error("Repair {} failed:", (Object)parentSession, (Object)illegalArgumentException);
            this.fireErrorAndComplete(progress.get(), totalProgress, illegalArgumentException.getMessage());
            return;
        }
        ArrayList<ColumnFamilyStore> arrayList = new ArrayList<ColumnFamilyStore>();
        try {
            Iterables.addAll(arrayList, validColumnFamilies);
            progress.incrementAndGet();
        }
        catch (IllegalArgumentException illegalArgumentException) {
            logger.error("Repair {} failed:", (Object)parentSession, (Object)illegalArgumentException);
            this.fireErrorAndComplete(progress.get(), totalProgress, illegalArgumentException.getMessage());
            return;
        }
        String[] stringArray = new String[arrayList.size()];
        for (int i = 0; i < arrayList.size(); ++i) {
            stringArray[i] = ((ColumnFamilyStore)arrayList.get((int)i)).name;
        }
        if (!this.options.isPreview()) {
            SystemDistributedKeyspace.startParentRepair(parentSession, this.keyspace, stringArray, this.options);
        }
        if ((force = this.options.isForcedRepair()) && this.options.isIncremental()) {
            HashSet actualNeighbors = Sets.newHashSet((Iterable)Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
            force = !allNeighbors.equals(actualNeighbors);
            allNeighbors = actualNeighbors;
        }
        try (Timer.Context ctx = Keyspace.open((String)this.keyspace).metric.repairPrepareTime.time();){
            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, this.options, force, arrayList);
            progress.incrementAndGet();
        }
        catch (Throwable t) {
            logger.error("Repair {} failed:", (Object)parentSession, (Object)t);
            if (!this.options.isPreview()) {
                SystemDistributedKeyspace.failParentRepair(parentSession, t);
            }
            this.fireErrorAndComplete(progress.get(), totalProgress, t.getMessage());
            return;
        }
        if (this.options.isPreview()) {
            this.previewRepair(parentSession, startTime, commonRanges, stringArray);
        } else if (this.options.isIncremental()) {
            this.incrementalRepair(parentSession, startTime, force, traceState, allNeighbors, commonRanges, stringArray);
        } else {
            this.normalRepair(parentSession, startTime, traceState, commonRanges, stringArray);
        }
    }

    private void normalRepair(UUID parentSession, long startTime, TraceState traceState, List<CommonRange> commonRanges, String ... cfnames) {
        ListeningExecutorService executor = this.createExecutor();
        ListenableFuture<List<RepairSessionResult>> allSessions = this.submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
        final ArrayList<Range<Token>> successfulRanges = new ArrayList<Range<Token>>();
        final AtomicBoolean hasFailure = new AtomicBoolean();
        ListenableFuture repairResult = Futures.transformAsync(allSessions, (AsyncFunction)new AsyncFunction<List<RepairSessionResult>, Object>(){

            public ListenableFuture apply(List<RepairSessionResult> results) {
                for (RepairSessionResult sessionResult : results) {
                    logger.debug("Repair result: {}", results);
                    if (sessionResult != null) {
                        if (sessionResult.skippedReplicas) continue;
                        successfulRanges.addAll(sessionResult.ranges);
                        continue;
                    }
                    hasFailure.compareAndSet(false, true);
                }
                return Futures.immediateFuture(null);
            }
        }, (Executor)MoreExecutors.directExecutor());
        Futures.addCallback((ListenableFuture)repairResult, (FutureCallback)new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, (ExecutorService)executor), (Executor)MoreExecutors.directExecutor());
    }

    @VisibleForTesting
    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force) {
        if (!force) {
            return commonRanges;
        }
        ArrayList<CommonRange> filtered = new ArrayList<CommonRange>(commonRanges.size());
        for (CommonRange commonRange : commonRanges) {
            ImmutableSet endpoints = ImmutableSet.copyOf((Iterable)Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
            ImmutableSet transEndpoints = ImmutableSet.copyOf((Iterable)Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
            Preconditions.checkState((boolean)endpoints.containsAll((Collection<?>)transEndpoints), (Object)"transEndpoints must be a subset of endpoints");
            if (endpoints.isEmpty()) continue;
            filtered.add(new CommonRange((Set<InetAddressAndPort>)endpoints, (Set<InetAddressAndPort>)transEndpoints, commonRange.ranges));
        }
        Preconditions.checkState((!filtered.isEmpty() ? 1 : 0) != 0, (Object)"Not enough live endpoints for a repair");
        return filtered;
    }

    private void incrementalRepair(UUID parentSession, long startTime, boolean forceRepair, TraceState traceState, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges, String ... cfnames) {
        ImmutableSet allParticipants = ImmutableSet.builder().addAll(allNeighbors).add((Object)FBUtilities.getBroadcastAddressAndPort()).build();
        List<CommonRange> allRanges = RepairRunnable.filterCommonRanges(commonRanges, (Set<InetAddressAndPort>)allParticipants, forceRepair);
        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, (Set<InetAddressAndPort>)allParticipants, forceRepair);
        ListeningExecutorService executor = this.createExecutor();
        AtomicBoolean hasFailure = new AtomicBoolean(false);
        ListenableFuture repairResult = coordinatorSession.execute(() -> this.submitRepairSessions(parentSession, true, executor, allRanges, cfnames), hasFailure);
        HashSet<Range<Token>> ranges = new HashSet<Range<Token>>();
        for (Collection range : Iterables.transform(allRanges, cr -> cr.ranges)) {
            ranges.addAll(range);
        }
        Futures.addCallback((ListenableFuture)repairResult, (FutureCallback)new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, (ExecutorService)executor), (Executor)MoreExecutors.directExecutor());
    }

    private void previewRepair(final UUID parentSession, final long startTime, List<CommonRange> commonRanges, String ... cfnames) {
        logger.debug("Starting preview repair for {}", (Object)parentSession);
        final ListeningExecutorService executor = this.createExecutor();
        ListenableFuture<List<RepairSessionResult>> allSessions = this.submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
        Futures.addCallback(allSessions, (FutureCallback)new FutureCallback<List<RepairSessionResult>>(){

            public void onSuccess(List<RepairSessionResult> results) {
                try {
                    String message;
                    PreviewKind previewKind = RepairRunnable.this.options.getPreviewKind();
                    assert (previewKind != PreviewKind.NONE);
                    SyncStatSummary summary = new SyncStatSummary(true);
                    summary.consumeSessionResults(results);
                    if (summary.isEmpty()) {
                        message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
                        logger.info(message);
                        RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
                    } else {
                        message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
                        logger.info(message);
                        RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
                    }
                    String successMessage = "Repair preview completed successfully";
                    RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, successMessage));
                    String completionMessage = this.complete();
                    ActiveRepairService.instance.recordRepairStatus(RepairRunnable.this.cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, (List<String>)ImmutableList.of((Object)message, (Object)successMessage, (Object)completionMessage));
                }
                catch (Throwable t) {
                    logger.error("Error completing preview repair", t);
                    this.onFailure(t);
                }
            }

            public void onFailure(Throwable t) {
                StorageMetrics.repairExceptions.inc();
                RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, t.getMessage()));
                logger.error("Error completing preview repair", t);
                String completionMessage = this.complete();
                RepairRunnable.this.recordFailure(t.getMessage(), completionMessage);
            }

            private String complete() {
                logger.debug("Preview repair {} completed", (Object)parentSession);
                String duration = DurationFormatUtils.formatDurationWords((long)(System.currentTimeMillis() - startTime), (boolean)true, (boolean)true);
                String message = String.format("Repair preview #%d finished in %s", RepairRunnable.this.cmd, duration);
                RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
                executor.shutdownNow();
                return message;
            }
        }, (Executor)MoreExecutors.directExecutor());
    }

    private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession, boolean isIncremental, ListeningExecutorService executor, List<CommonRange> commonRanges, String ... cfnames) {
        ArrayList<RepairSession> futures = new ArrayList<RepairSession>(this.options.getRanges().size());
        boolean force = this.options.isForcedRepair() && !isIncremental;
        for (CommonRange commonRange : commonRanges) {
            logger.info("Starting RepairSession for {}", (Object)commonRange);
            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, commonRange, this.keyspace, this.options.getParallelism(), isIncremental, this.options.isPullRepair(), force, this.options.getPreviewKind(), this.options.optimiseStreams(), executor, cfnames);
            if (session == null) continue;
            Futures.addCallback((ListenableFuture)session, (FutureCallback)new RepairSessionCallback(session), (Executor)MoreExecutors.directExecutor());
            futures.add(session);
        }
        return Futures.successfulAsList(futures);
    }

    private ListeningExecutorService createExecutor() {
        return MoreExecutors.listeningDecorator((ExecutorService)new JMXEnabledThreadPoolExecutor(this.options.getJobThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("Repair#" + this.cmd), "internal"));
    }

    private void recordFailure(String failureMessage, String completionMessage) {
        String failure = failureMessage == null ? "unknown failure" : failureMessage;
        String completion = completionMessage == null ? "unknown completion" : completionMessage;
        ActiveRepairService.instance.recordRepairStatus(this.cmd, ActiveRepairService.ParentRepairStatus.FAILED, (List<String>)ImmutableList.of((Object)failure, (Object)completion));
    }

    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) {
        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
        Set<InetAddressAndPort> transEndpoints = ((EndpointsForRange)neighbors.filter(Replica::isTransient)).endpoints();
        for (CommonRange commonRange : neighborRangeList) {
            if (!commonRange.matchesEndpoints(endpoints, transEndpoints)) continue;
            commonRange.ranges.add(range);
            return;
        }
        ArrayList<Range<Token>> ranges = new ArrayList<Range<Token>>();
        ranges.add(range);
        neighborRangeList.add(new CommonRange(endpoints, transEndpoints, ranges));
    }

    private Thread createQueryThread(int cmd, final UUID sessionId) {
        return NamedThreadFactory.createThread(new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                TraceState.Status status;
                TraceState state = Tracing.instance.get(sessionId);
                if (state == null) {
                    throw new Exception("no tracestate");
                }
                String format = "select event_id, source, source_port, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
                String query = String.format(format, "system_traces", "events");
                SelectStatement statement = (SelectStatement)QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls());
                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
                InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
                HashSet[] seen = new HashSet[]{new HashSet(), new HashSet()};
                int si = 0;
                long tlast = System.currentTimeMillis();
                long minWaitMillis = 125L;
                long maxWaitMillis = 1024000L;
                long timeout = minWaitMillis;
                boolean shouldDouble = false;
                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) {
                    if (status == TraceState.Status.IDLE) {
                        timeout = shouldDouble ? Math.min(timeout * 2L, maxWaitMillis) : timeout;
                        shouldDouble = !shouldDouble;
                    } else {
                        timeout = minWaitMillis;
                        shouldDouble = false;
                    }
                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000L));
                    long tcur = System.currentTimeMillis();
                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur));
                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList((Object[])new ByteBuffer[]{sessionIdBytes, tminBytes, tmaxBytes}));
                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
                    UntypedResultSet result = UntypedResultSet.create(rows.result);
                    for (UntypedResultSet.Row r : result) {
                        InetAddressAndPort eventNode;
                        int port = DatabaseDescriptor.getStoragePort();
                        if (r.has("source_port")) {
                            port = r.getInt("source_port");
                        }
                        if (source.equals(eventNode = InetAddressAndPort.getByAddressOverrideDefaults(r.getInetAddress("source"), port))) continue;
                        UUID uuid = r.getUUID("event_id");
                        if (uuid.timestamp() > (tcur - 1000L) * 10000L) {
                            seen[si].add(uuid);
                        }
                        if (seen[si == 0 ? 1 : 0].contains(uuid)) continue;
                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
                        RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
                    }
                    tlast = tcur;
                    si = si == 0 ? 1 : 0;
                    seen[si].clear();
                }
            }
        }, "Repair-Runnable-" + threadCounter.incrementAndGet());
    }

    private class RepairCompleteCallback
    implements FutureCallback<Object> {
        final UUID parentSession;
        final Collection<Range<Token>> successfulRanges;
        final long startTime;
        final TraceState traceState;
        final AtomicBoolean hasFailure;
        final ExecutorService executor;

        public RepairCompleteCallback(UUID parentSession, Collection<Range<Token>> successfulRanges, long startTime, TraceState traceState, AtomicBoolean hasFailure, ExecutorService executor) {
            this.parentSession = parentSession;
            this.successfulRanges = successfulRanges;
            this.startTime = startTime;
            this.traceState = traceState;
            this.hasFailure = hasFailure;
            this.executor = executor;
        }

        public void onSuccess(Object result) {
            String message;
            if (!RepairRunnable.this.options.isPreview()) {
                SystemDistributedKeyspace.successfulParentRepair(this.parentSession, this.successfulRanges);
            }
            if (this.hasFailure.get()) {
                StorageMetrics.repairExceptions.inc();
                message = "Some repair failed";
                RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
            } else {
                message = "Repair completed successfully";
                RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
            }
            String completionMessage = this.repairComplete();
            if (this.hasFailure.get()) {
                RepairRunnable.this.recordFailure(message, completionMessage);
            } else {
                ActiveRepairService.instance.recordRepairStatus(RepairRunnable.this.cmd, ActiveRepairService.ParentRepairStatus.COMPLETED, (List<String>)ImmutableList.of((Object)message, (Object)completionMessage));
            }
        }

        public void onFailure(Throwable t) {
            StorageMetrics.repairExceptions.inc();
            RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, t.getMessage()));
            if (!RepairRunnable.this.options.isPreview()) {
                SystemDistributedKeyspace.failParentRepair(this.parentSession, t);
            }
            String completionMessage = this.repairComplete();
            RepairRunnable.this.recordFailure(t.getMessage(), completionMessage);
        }

        private String repairComplete() {
            ActiveRepairService.instance.removeParentRepairSession(this.parentSession);
            long durationMillis = System.currentTimeMillis() - this.startTime;
            String duration = DurationFormatUtils.formatDurationWords((long)durationMillis, (boolean)true, (boolean)true);
            String message = String.format("Repair command #%d finished in %s", RepairRunnable.this.cmd, duration);
            RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, RepairRunnable.this.progress.get(), RepairRunnable.this.totalProgress, message));
            logger.info(message);
            if (RepairRunnable.this.options.isTraced() && this.traceState != null) {
                for (ProgressListener listener : RepairRunnable.this.listeners) {
                    this.traceState.removeProgressListener(listener);
                }
                Tracing.instance.set(this.traceState);
                Tracing.traceRepair(message, new Object[0]);
                Tracing.instance.stopSession();
            }
            this.executor.shutdownNow();
            Keyspace.open((String)((RepairRunnable)RepairRunnable.this).keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
            return message;
        }
    }

    private class RepairSessionCallback
    implements FutureCallback<RepairSessionResult> {
        private final RepairSession session;

        public RepairSessionCallback(RepairSession session) {
            this.session = session;
        }

        public void onSuccess(RepairSessionResult result) {
            String message = String.format("Repair session %s for range %s finished", this.session.getId(), this.session.ranges().toString());
            logger.info(message);
            RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS, RepairRunnable.this.progress.incrementAndGet(), RepairRunnable.this.totalProgress, message));
        }

        public void onFailure(Throwable t) {
            StorageMetrics.repairExceptions.inc();
            String message = String.format("Repair session %s for range %s failed with error %s", this.session.getId(), this.session.ranges().toString(), t.getMessage());
            logger.error(message, t);
            RepairRunnable.this.fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, RepairRunnable.this.progress.incrementAndGet(), RepairRunnable.this.totalProgress, message));
        }
    }
}

