/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairFuture;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveRepairService {
    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
    public static final ActiveRepairService instance = new ActiveRepairService();
    public static final long UNREPAIRED_SSTABLE = 0L;
    private static final ThreadPoolExecutor executor = new JMXConfigurableThreadPoolExecutor(4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AntiEntropySessions"), "internal");
    private final ConcurrentMap<UUID, RepairSession> sessions = new ConcurrentHashMap<UUID, RepairSession>();
    private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<UUID, ParentRepairSession>();

    protected ActiveRepairService() {
    }

    public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String ... cfnames) {
        RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames);
        if (session.endpoints.isEmpty()) {
            return null;
        }
        RepairFuture futureTask = new RepairFuture(session);
        executor.execute(futureTask);
        return futureTask;
    }

    public void addToActiveSessions(RepairSession session) {
        this.sessions.put(session.getId(), session);
        Gossiper.instance.register(session);
        FailureDetector.instance.registerFailureDetectionEventListener(session);
    }

    public void removeFromActiveSessions(RepairSession session) {
        FailureDetector.instance.unregisterFailureDetectionEventListener(session);
        Gossiper.instance.unregister(session);
        this.sessions.remove(session.getId());
    }

    public void terminateSessions() {
        for (RepairSession session : this.sessions.values()) {
            session.forceShutdown();
        }
        this.parentRepairSessions.clear();
    }

    RepairFuture submitArtificialRepairSession(RepairJobDesc desc) {
        HashSet<InetAddress> neighbours = new HashSet<InetAddress>();
        neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
        RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily});
        this.sessions.put(session.getId(), session);
        RepairFuture futureTask = new RepairFuture(session);
        executor.execute(futureTask);
        return futureTask;
    }

    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) {
        if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) {
            throw new IllegalArgumentException("The local data center must be part of the repair");
        }
        StorageService ss = StorageService.instance;
        Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
        Range<Token> rangeSuperSet = null;
        for (Range<Token> range : ss.getLocalRanges(keyspaceName)) {
            if (range.contains((Token)((Object)toRepair))) {
                rangeSuperSet = range;
                break;
            }
            if (!range.intersects(toRepair)) continue;
            throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");
        }
        if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) {
            return Collections.emptySet();
        }
        HashSet<InetAddress> neighbors = new HashSet<InetAddress>((Collection)replicaSets.get(rangeSuperSet));
        neighbors.remove(FBUtilities.getBroadcastAddress());
        if (dataCenters != null) {
            TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
            HashSet dcEndpoints = Sets.newHashSet();
            Multimap<String, InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints();
            for (String dc : dataCenters) {
                Collection c = dcEndpointsMap.get((Object)dc);
                if (c == null) continue;
                dcEndpoints.addAll(c);
            }
            return Sets.intersection(neighbors, (Set)dcEndpoints);
        }
        if (hosts != null) {
            HashSet<InetAddress> specifiedHost = new HashSet<InetAddress>();
            for (String host : hosts) {
                try {
                    InetAddress endpoint = InetAddress.getByName(host.trim());
                    if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && !neighbors.contains(endpoint)) continue;
                    specifiedHost.add(endpoint);
                }
                catch (UnknownHostException e) {
                    throw new IllegalArgumentException("Unknown host specified " + host, e);
                }
            }
            if (!specifiedHost.contains(FBUtilities.getBroadcastAddress())) {
                throw new IllegalArgumentException("The current host must be part of the repair");
            }
            if (specifiedHost.size() <= 1) {
                String msg = "Repair requires at least two endpoints that are neighbours before it can continue, the endpoint used for this repair is %s, other available neighbours are %s but these neighbours were not part of the supplied list of hosts to use during the repair (%s).";
                throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors, hosts));
            }
            specifiedHost.remove(FBUtilities.getBroadcastAddress());
            return specifiedHost;
        }
        return neighbors;
    }

    public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores) {
        UUID parentRepairSession = UUIDGen.getTimeUUID();
        this.registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
        final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
        final AtomicBoolean status = new AtomicBoolean(true);
        IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure(){

            @Override
            public void response(MessageIn msg) {
                prepareLatch.countDown();
            }

            @Override
            public boolean isLatencyForSnitch() {
                return false;
            }

            @Override
            public void onFailure(InetAddress from) {
                status.set(false);
                prepareLatch.countDown();
            }
        };
        ArrayList<UUID> cfIds = new ArrayList<UUID>(columnFamilyStores.size());
        for (ColumnFamilyStore cfs : columnFamilyStores) {
            cfIds.add(cfs.metadata.cfId);
        }
        for (InetAddress neighbour : endpoints) {
            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
            MessageOut<RepairMessage> msg = message.createMessage();
            MessagingService.instance().sendRRWithFailure(msg, neighbour, callback);
        }
        try {
            prepareLatch.await(1L, TimeUnit.HOURS);
        }
        catch (InterruptedException e) {
            this.parentRepairSessions.remove(parentRepairSession);
            throw new RuntimeException("Did not get replies from all endpoints.", e);
        }
        if (!status.get()) {
            this.parentRepairSessions.remove(parentRepairSession);
            throw new RuntimeException("Did not get positive replies from all endpoints.");
        }
        return parentRepairSession;
    }

    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges) {
        HashMap<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<UUID, Set<SSTableReader>>();
        for (ColumnFamilyStore cfs : columnFamilyStores) {
            HashSet<SSTableReader> sstables = new HashSet<SSTableReader>();
            for (SSTableReader sstable : cfs.getSSTables()) {
                if (!((AbstractBounds)new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())).intersects(ranges) || sstable.isRepaired()) continue;
                sstables.add(sstable);
            }
            sstablesToRepair.put(cfs.metadata.cfId, sstables);
        }
        this.parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) throws InterruptedException, ExecutionException, IOException {
        for (InetAddress neighbor : neighbors) {
            AnticompactionRequest acr = new AnticompactionRequest(parentSession);
            MessageOut<RepairMessage> req = acr.createMessage();
            MessagingService.instance().sendOneWay(req, neighbor);
        }
        try {
            List<Future<?>> futures = this.doAntiCompaction(parentSession);
            FBUtilities.waitOnFutures(futures);
        }
        finally {
            this.parentRepairSessions.remove(parentSession);
        }
    }

    public ParentRepairSession getParentRepairSession(UUID parentSessionId) {
        return (ParentRepairSession)this.parentRepairSessions.get(parentSessionId);
    }

    public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException {
        assert (parentRepairSession != null);
        ParentRepairSession prs = this.getParentRepairSession(parentRepairSession);
        ArrayList futures = new ArrayList();
        for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) {
            HashSet<SSTableReader> sstables = new HashSet<SSTableReader>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
            ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
            boolean success = false;
            while (!success) {
                for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting()) {
                    if (!sstables.remove(compactingSSTable)) continue;
                    SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
                }
                success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
            }
            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
        }
        return futures;
    }

    public void handleMessage(InetAddress endpoint, RepairMessage message) {
        RepairJobDesc desc = message.desc;
        RepairSession session = (RepairSession)this.sessions.get(desc.sessionId);
        if (session == null) {
            return;
        }
        switch (message.messageType) {
            case VALIDATION_COMPLETE: {
                ValidationComplete validation = (ValidationComplete)message;
                session.validationComplete(desc, endpoint, validation.tree);
                break;
            }
            case SYNC_COMPLETE: {
                SyncComplete sync = (SyncComplete)message;
                session.syncComplete(desc, sync.nodes, sync.success);
                break;
            }
        }
    }

    public static class ParentRepairSession {
        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<UUID, ColumnFamilyStore>();
        public final Collection<Range<Token>> ranges;
        public final Map<UUID, Set<SSTableReader>> sstableMap;
        public final long repairedAt;

        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long repairedAt) {
            for (ColumnFamilyStore cfs : columnFamilyStores) {
                this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
            }
            this.ranges = ranges;
            this.sstableMap = sstables;
            this.repairedAt = repairedAt;
        }

        public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId) {
            Set<SSTableReader> sstables = this.sstableMap.get(cfId);
            Iterator<SSTableReader> sstableIterator = sstables.iterator();
            while (sstableIterator.hasNext()) {
                SSTableReader sstable = sstableIterator.next();
                if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists()) {
                    sstableIterator.remove();
                    continue;
                }
                if (sstable.acquireReference()) continue;
                sstableIterator.remove();
            }
            return sstables;
        }
    }

    public static enum Status {
        STARTED,
        SESSION_SUCCESS,
        SESSION_FAILED,
        FINISHED;

    }
}

