/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Objects;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AntiEntropyService {
    private static final Logger logger = LoggerFactory.getLogger(AntiEntropyService.class);
    public static final AntiEntropyService instance = new AntiEntropyService();
    public static final long REQUEST_TIMEOUT = 172800000L;
    private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests = new ExpiringMap(172800000L);
    private final ConcurrentMap<String, RepairSession.Callback> sessions = new ConcurrentHashMap<String, RepairSession.Callback>();

    protected AntiEntropyService() {
    }

    public RepairSession getRepairSession(String tablename, String ... cfnames) {
        return new RepairSession(tablename, cfnames);
    }

    void completedRequest(TreeRequest request) {
        ((RepairSession.Callback)this.sessions.get(request.sessionid)).completed(request);
    }

    private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid) {
        Map<TreeRequest, TreePair> ctrees = this.requests.get(sessionid);
        if (ctrees == null) {
            ctrees = new HashMap<TreeRequest, TreePair>();
            this.requests.put(sessionid, ctrees);
        }
        return ctrees;
    }

    static Set<InetAddress> getNeighbors(String table) {
        StorageService ss = StorageService.instance;
        HashSet<InetAddress> neighbors = new HashSet<InetAddress>();
        Map<Range, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
        for (Range range : ss.getLocalRanges(table)) {
            neighbors.addAll((Collection<InetAddress>)replicaSets.get(range));
        }
        neighbors.remove(FBUtilities.getLocalAddress());
        return neighbors;
    }

    private void rendezvous(TreeRequest request, MerkleTree tree) {
        InetAddress LOCAL = FBUtilities.getLocalAddress();
        Map<TreeRequest, TreePair> ctrees = this.rendezvousPairs(request.sessionid);
        ArrayList<Differencer> differencers = new ArrayList<Differencer>();
        if (LOCAL.equals(request.endpoint)) {
            for (InetAddress neighbor : AntiEntropyService.getNeighbors((String)request.cf.left)) {
                TreeRequest remotereq = new TreeRequest(request.sessionid, neighbor, request.cf);
                TreePair waiting = ctrees.remove(remotereq);
                if (waiting != null && waiting.right != null) {
                    differencers.add(new Differencer(remotereq, tree, (MerkleTree)waiting.right));
                    continue;
                }
                ctrees.put(remotereq, new TreePair(tree, null));
                logger.debug("Stored local tree for " + request + " to wait for " + remotereq);
            }
        } else {
            TreePair waiting = ctrees.remove(request);
            if (waiting != null && waiting.left != null) {
                differencers.add(new Differencer(request, (MerkleTree)waiting.left, tree));
            } else {
                ctrees.put(request, new TreePair(null, tree));
                logger.debug("Stored remote tree for " + request + " to wait for local tree.");
            }
        }
        for (Differencer differencer : differencers) {
            logger.info("Queueing comparison " + differencer);
            StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
        }
    }

    TreeRequest request(String sessionid, InetAddress remote, String ksname, String cfname) {
        TreeRequest request = new TreeRequest(sessionid, remote, new CFPair(ksname, cfname));
        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request), remote);
        return request;
    }

    void respond(Validator validator, InetAddress local) {
        MessagingService ms = MessagingService.instance();
        try {
            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
            logger.info("Sending AEService tree for " + validator.request);
            ms.sendOneWay(message, validator.request.endpoint);
        }
        catch (Exception e) {
            logger.error("Could not send valid tree for request " + validator.request, (Throwable)e);
        }
    }

    class RepairSession
    extends Thread {
        private final String tablename;
        private final String[] cfnames;
        private final SimpleCondition requestsMade;
        private final ConcurrentHashMap<TreeRequest, Object> requests;

        public RepairSession(String tablename, String ... cfnames) {
            super("manual-repair-" + UUID.randomUUID());
            this.tablename = tablename;
            this.cfnames = cfnames;
            this.requestsMade = new SimpleCondition();
            this.requests = new ConcurrentHashMap();
        }

        public void blockUntilRunning() throws InterruptedException {
            this.requestsMade.await();
        }

        @Override
        public void run() {
            Set<InetAddress> endpoints = AntiEntropyService.getNeighbors(this.tablename);
            if (endpoints.isEmpty()) {
                logger.info("No neighbors to repair with: " + this.getName() + " completed.");
                return;
            }
            Callback callback = new Callback();
            AntiEntropyService.this.sessions.put(this.getName(), callback);
            try {
                for (String cfname : this.cfnames) {
                    for (InetAddress endpoint : endpoints) {
                        this.requests.put(AntiEntropyService.this.request(this.getName(), endpoint, this.tablename, cfname), this);
                    }
                    AntiEntropyService.this.request(this.getName(), FBUtilities.getLocalAddress(), this.tablename, cfname);
                }
                logger.info("Waiting for repair requests: " + this.requests.keySet());
                this.requestsMade.signalAll();
                callback.completed.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background.");
            }
        }

        class Callback {
            public final SimpleCondition completed = new SimpleCondition();

            Callback() {
            }

            public void completed(TreeRequest request) {
                try {
                    RepairSession.this.blockUntilRunning();
                }
                catch (InterruptedException e) {
                    throw new AssertionError((Object)e);
                }
                RepairSession.this.requests.remove(request);
                logger.info("{} completed successfully: {} outstanding.", (Object)request, (Object)RepairSession.this.requests.size());
                if (!RepairSession.this.requests.isEmpty()) {
                    return;
                }
                logger.info("Session " + RepairSession.this.getName() + " completed successfully.");
                AntiEntropyService.this.sessions.remove(RepairSession.this.getName());
                this.completed.signalAll();
            }
        }
    }

    public static class TreeRequest {
        public final String sessionid;
        public final InetAddress endpoint;
        public final CFPair cf;

        public TreeRequest(String sessionid, InetAddress endpoint, CFPair cf) {
            this.sessionid = sessionid;
            this.endpoint = endpoint;
            this.cf = cf;
        }

        public final int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.sessionid, this.endpoint, this.cf});
        }

        public final boolean equals(Object o) {
            if (!(o instanceof TreeRequest)) {
                return false;
            }
            TreeRequest that = (TreeRequest)o;
            return Objects.equal((Object)this.sessionid, (Object)that.sessionid) && Objects.equal((Object)this.endpoint, (Object)that.endpoint) && Objects.equal((Object)this.cf, (Object)that.cf);
        }

        public String toString() {
            return "#<TreeRequest " + this.sessionid + ", " + this.endpoint + ", " + this.cf + ">";
        }
    }

    static class TreePair
    extends Pair<MerkleTree, MerkleTree> {
        public TreePair(MerkleTree local, MerkleTree remote) {
            super(local, remote);
        }
    }

    static class CFPair
    extends Pair<String, String> {
        public CFPair(String table, String cf) {
            super(table, cf);
            assert (table != null && cf != null);
        }
    }

    public static class TreeResponseVerbHandler
    implements IVerbHandler,
    ICompactSerializer<Validator> {
        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();

        static Message makeVerb(InetAddress local, Validator validator) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(validator, dos);
                return new Message(local, StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(Validator v, DataOutputStream dos) throws IOException {
            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos);
            ObjectOutputStream oos = new ObjectOutputStream(dos);
            oos.writeObject(v.tree);
            oos.flush();
        }

        @Override
        public Validator deserialize(DataInputStream dis) throws IOException {
            TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
            ObjectInputStream ois = new ObjectInputStream(dis);
            try {
                return new Validator(request, (MerkleTree)ois.readObject());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                Validator response = this.deserialize(buffer);
                TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.cf);
                instance.rendezvous(request, response.tree);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class TreeRequestVerbHandler
    implements IVerbHandler,
    ICompactSerializer<TreeRequest> {
        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();

        static Message makeVerb(TreeRequest request) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(request, dos);
                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(TreeRequest request, DataOutputStream dos) throws IOException {
            dos.writeUTF(request.sessionid);
            CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
            dos.writeUTF((String)request.cf.left);
            dos.writeUTF((String)request.cf.right);
        }

        @Override
        public TreeRequest deserialize(DataInputStream dis) throws IOException {
            return new TreeRequest(dis.readUTF(), CompactEndpointSerializationHelper.deserialize(dis), new CFPair(dis.readUTF(), dis.readUTF()));
        }

        @Override
        public void doVerb(Message message, String id) {
            byte[] bytes = message.getMessageBody();
            DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                TreeRequest remotereq = this.deserialize(buffer);
                TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.cf);
                ColumnFamilyStore store = Table.open((String)request.cf.left).getColumnFamilyStore((String)request.cf.right);
                Validator validator = new Validator(request);
                logger.debug("Queueing validation compaction for " + request);
                CompactionManager.instance.submitValidation(store, validator);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class Differencer
    implements Runnable {
        public final TreeRequest request;
        public final MerkleTree ltree;
        public final MerkleTree rtree;
        public final List<Range> differences;

        public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree rtree) {
            this.request = request;
            this.ltree = ltree;
            this.rtree = rtree;
            this.differences = new ArrayList<Range>();
        }

        @Override
        public void run() {
            InetAddress local = FBUtilities.getLocalAddress();
            StorageService ss = StorageService.instance;
            if (this.ltree.partitioner() == null) {
                this.ltree.partitioner(StorageService.getPartitioner());
            }
            if (this.rtree.partitioner() == null) {
                this.rtree.partitioner(StorageService.getPartitioner());
            }
            HashSet<Range> interesting = new HashSet<Range>(ss.getRangesForEndpoint((String)this.request.cf.left, local));
            interesting.retainAll(ss.getRangesForEndpoint((String)this.request.cf.left, this.request.endpoint));
            for (MerkleTree.TreeRange diff : MerkleTree.difference(this.ltree, this.rtree)) {
                for (Range localrange : interesting) {
                    this.differences.addAll(diff.intersectionWith(localrange));
                }
            }
            String format = "Endpoints " + local + " and " + this.request.endpoint + " %s for " + this.request.cf;
            if (this.differences.isEmpty()) {
                logger.info(String.format(format, "are consistent"));
                instance.completedRequest(this.request);
                return;
            }
            logger.info(String.format(format, "have " + this.differences.size() + " range(s) out of sync"));
            try {
                this.performStreamingRepair();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        void performStreamingRepair() throws IOException {
            logger.info("Performing streaming repair of " + this.differences.size() + " ranges for " + this.request);
            ColumnFamilyStore cfstore = Table.open((String)this.request.cf.left).getColumnFamilyStore((String)this.request.cf.right);
            try {
                ArrayList<Range> ranges = new ArrayList<Range>(this.differences);
                Collection<SSTableReader> sstables = cfstore.getSSTables();
                Callback callback = new Callback();
                StreamOutSession outsession = StreamOutSession.create((String)this.request.cf.left, this.request.endpoint, callback);
                StreamOut.transferSSTables(outsession, sstables, ranges);
                StreamIn.requestRanges(this.request.endpoint, (String)this.request.cf.left, ranges, callback);
            }
            catch (Exception e) {
                throw new IOException("Streaming repair failed.", e);
            }
        }

        public String toString() {
            return "#<Differencer " + this.request + ">";
        }

        class Callback
        extends WrappedRunnable {
            private final AtomicInteger outstanding = new AtomicInteger(2);

            Callback() {
            }

            @Override
            protected void runMayThrow() throws Exception {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                logger.info("Finished streaming repair for " + Differencer.this.request);
                instance.completedRequest(Differencer.this.request);
            }
        }
    }

    public static class Validator
    implements Runnable {
        public final TreeRequest request;
        public final MerkleTree tree;
        private transient List<MerkleTree.RowHash> minrows;
        private transient Token mintoken;
        private transient long validated;
        private transient MerkleTree.TreeRange range;
        private transient MerkleTree.TreeRangeIterator ranges;
        public static final MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

        Validator(TreeRequest request) {
            this(request, new MerkleTree(DatabaseDescriptor.getPartitioner(), 126, (int)Math.pow(2.0, 15.0)));
        }

        Validator(TreeRequest request, MerkleTree tree) {
            this.request = request;
            this.tree = tree;
            this.minrows = new ArrayList<MerkleTree.RowHash>();
            this.mintoken = null;
            this.validated = 0L;
            this.range = null;
            this.ranges = null;
        }

        public void prepare(ColumnFamilyStore cfs) {
            ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
            for (DecoratedKey sample : cfs.allKeySamples()) {
                keys.add(sample);
            }
            if (keys.isEmpty()) {
                this.tree.init();
            } else {
                DecoratedKey dk;
                int numkeys = keys.size();
                Random random = new Random();
                do {
                    dk = (DecoratedKey)keys.get(random.nextInt(numkeys));
                } while (this.tree.split((Token)dk.token));
            }
            logger.debug("Prepared AEService tree of size " + this.tree.size() + " for " + this.request);
            this.mintoken = this.tree.partitioner().getMinimumToken();
            this.ranges = this.tree.invalids(new Range(this.mintoken, this.mintoken));
        }

        public void add(AbstractCompactedRow row) {
            if (this.mintoken != null) {
                assert (this.ranges != null) : "Validator was not prepared()";
                if (((Token)row.key.token).compareTo(this.mintoken) == 0) {
                    this.minrows.add(this.rowHash(row));
                    return;
                }
                this.mintoken = null;
            }
            if (this.range == null) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            while (!this.range.contains((Token)row.key.token)) {
                this.range.addHash(EMPTY_ROW);
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            this.range.addHash(this.rowHash(row));
        }

        private MerkleTree.RowHash rowHash(AbstractCompactedRow row) {
            ++this.validated;
            MessageDigest digest = null;
            try {
                digest = MessageDigest.getInstance("SHA-256");
            }
            catch (NoSuchAlgorithmException e) {
                throw new AssertionError((Object)e);
            }
            row.update(digest);
            return new MerkleTree.RowHash((Token)row.key.token, digest.digest());
        }

        public void complete() {
            assert (this.ranges != null) : "Validator was not prepared()";
            if (this.range != null) {
                this.range.addHash(EMPTY_ROW);
            }
            while (this.ranges.hasNext()) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
                this.range.addHash(EMPTY_ROW);
            }
            if (!this.minrows.isEmpty()) {
                for (MerkleTree.RowHash minrow : this.minrows) {
                    this.range.addHash(minrow);
                }
            }
            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
            logger.debug("Validated " + this.validated + " rows into AEService tree for " + this.request);
        }

        @Override
        public void run() {
            instance.respond(this, FBUtilities.getLocalAddress());
        }
    }
}

