/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HintedHandOffManager
implements HintedHandOffManagerMBean {
    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=HintedHandoffManager";
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
    private static final int PAGE_SIZE = 128;
    private static final int LARGE_NUMBER = 65536;
    public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
    private volatile boolean hintedHandOffPaused = false;
    static final CompositeType comparator = CompositeType.getInstance(Arrays.asList(UUIDType.instance, Int32Type.instance));
    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet();
    private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("HintedHandoff", 1), "internal");
    private final ColumnFamilyStore hintStore = Keyspace.open("system").getColumnFamilyStore("hints");

    public RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId) {
        assert (ttl > 0);
        this.metrics.incrCreatedHints(StorageService.instance.getTokenMetadata().getEndpointForHostId(targetId));
        UUID hintId = UUIDGen.getTimeUUID();
        ByteBuffer name = comparator.decompose(hintId, 7);
        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, 7));
        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData("system", "hints"));
        cf.addColumn(name, value, System.currentTimeMillis(), ttl);
        return new RowMutation("system", UUIDType.instance.decompose(targetId), cf);
    }

    public static int calculateHintTTL(RowMutation mutation) {
        int ttl = Integer.MAX_VALUE;
        for (ColumnFamily cf : mutation.getColumnFamilies()) {
            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
        }
        return ttl;
    }

    public void start() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.debug("Created HHOM instance, registered MBean.");
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                HintedHandOffManager.this.scheduleAllDeliveries();
                HintedHandOffManager.this.metrics.log();
            }
        };
        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10L, 10L, TimeUnit.MINUTES);
    }

    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) {
        RowMutation rm = new RowMutation("system", tokenBytes);
        rm.delete("hints", columnName, timestamp);
        rm.applyUnsafe();
    }

    @Override
    public void deleteHintsForEndpoint(String ipOrHostname) {
        try {
            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
            this.deleteHintsForEndpoint(endpoint);
        }
        catch (UnknownHostException e) {
            logger.warn("Unable to find {}, not a hostname or ipaddr of a node", (Object)ipOrHostname);
            throw new RuntimeException(e);
        }
    }

    public void deleteHintsForEndpoint(final InetAddress endpoint) {
        if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) {
            return;
        }
        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
        ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
        final RowMutation rm = new RowMutation("system", hostIdBytes);
        rm.delete("hints", System.currentTimeMillis());
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("Deleting any stored hints for {}", (Object)endpoint);
                    rm.apply();
                    HintedHandOffManager.this.compact();
                }
                catch (Exception e) {
                    logger.warn("Could not delete hints for {}: {}", (Object)endpoint, (Object)e);
                }
            }
        };
        StorageService.optionalTasks.execute(runnable);
    }

    @VisibleForTesting
    protected Future<?> compact() {
        this.hintStore.forceBlockingFlush();
        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
        for (SSTable sSTable : this.hintStore.getSSTables()) {
            descriptors.add(sSTable.descriptor);
        }
        return CompactionManager.instance.submitUserDefined(this.hintStore, descriptors, (int)(System.currentTimeMillis() / 1000L));
    }

    private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn) {
        return hintColumnFamily == null || hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null;
    }

    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException {
        Gossiper gossiper = Gossiper.instance;
        int waited = 0;
        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        waited = 0;
        while (!gossiper.getEndpointStateForEndpoint((InetAddress)endpoint).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value.equals(gossiper.getEndpointStateForEndpoint((InetAddress)FBUtilities.getBroadcastAddress()).getApplicationState((ApplicationState)ApplicationState.SCHEMA).value)) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            if ((waited += 1000) <= 2 * StorageService.RING_DELAY) continue;
            throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
        }
        logger.debug("schema for {} matches local schema", (Object)endpoint);
        return waited;
    }

    private void deliverHintsToEndpoint(InetAddress endpoint) {
        if (this.hintStore.isEmpty()) {
            return;
        }
        if (this.hintedHandOffPaused) {
            logger.debug("Hints delivery process is paused, aborting");
            return;
        }
        logger.debug("Checking remote({}) schema before delivering hints", (Object)endpoint);
        try {
            this.waitForSchemaAgreement(endpoint);
        }
        catch (TimeoutException e) {
            return;
        }
        if (!FailureDetector.instance.isAlive(endpoint)) {
            logger.debug("Endpoint {} died before hint delivery, aborting", (Object)endpoint);
            return;
        }
        this.doDeliverHintsToEndpoint(endpoint);
    }

    private void doDeliverHintsToEndpoint(InetAddress endpoint) {
        long now;
        QueryFilter filter;
        ColumnFamily hintsPage;
        UUID hostId = Gossiper.instance.getHostId(endpoint);
        logger.info("Started hinted handoff for host: {} with IP: {}", (Object)hostId, (Object)endpoint);
        final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
        DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
        final AtomicInteger rowsReplayed = new AtomicInteger(0);
        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
        int pageSize = this.calculatePageSize();
        logger.debug("Using pageSize of {}", (Object)pageSize);
        int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
        RateLimiter rateLimiter = RateLimiter.create((double)(throttleInKB == 0 ? Double.MAX_VALUE : (double)(throttleInKB * 1024)));
        block7: while (!HintedHandOffManager.pagingFinished(hintsPage = ColumnFamilyStore.removeDeleted(this.hintStore.getColumnFamily(filter = QueryFilter.getSliceFilter(epkey, "hints", startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize, now = System.currentTimeMillis())), (int)(now / 1000L)), startColumn)) {
            if (!FailureDetector.instance.isAlive(endpoint)) {
                logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", (Object)endpoint, (Object)rowsReplayed);
                return;
            }
            ArrayList responseHandlers = Lists.newArrayList();
            for (final Column hint : hintsPage) {
                RowMutation rm;
                if (this.hintedHandOffPaused) {
                    logger.debug("Hints delivery process is paused, aborting");
                    break block7;
                }
                if (!hint.isLive(System.currentTimeMillis())) continue;
                startColumn = hint.name();
                ByteBuffer[] components = comparator.split(hint.name());
                int version = (Integer)Int32Type.instance.compose(components[1]);
                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
                try {
                    rm = RowMutation.serializer.deserialize(in, version);
                }
                catch (UnknownColumnFamilyException e) {
                    logger.debug("Skipping delivery of hint for deleted columnfamily", (Throwable)e);
                    HintedHandOffManager.deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                    continue;
                }
                catch (IOException e) {
                    throw new AssertionError((Object)e);
                }
                HashMap<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
                for (UUID cfId : ImmutableSet.copyOf(rm.getColumnFamilyIds())) {
                    Long truncatedAt = (Long)truncationTimesCache.get(cfId);
                    if (truncatedAt == null) {
                        ColumnFamilyStore cfs = Keyspace.open(rm.getKeyspaceName()).getColumnFamilyStore(cfId);
                        truncatedAt = cfs.getTruncationTime();
                        truncationTimesCache.put(cfId, truncatedAt);
                    }
                    if (hint.maxTimestamp() >= truncatedAt) continue;
                    logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId);
                    rm = rm.without(cfId);
                }
                if (rm.isEmpty()) {
                    HintedHandOffManager.deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                    continue;
                }
                MessageOut<RowMutation> message = rm.createMessage();
                rateLimiter.acquire(message.serializedSize(7));
                Runnable callback = new Runnable(){

                    @Override
                    public void run() {
                        rowsReplayed.incrementAndGet();
                        HintedHandOffManager.deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                    }
                };
                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
                MessagingService.instance().sendRR(message, endpoint, responseHandler);
                responseHandlers.add(responseHandler);
            }
            for (WriteResponseHandler handler : responseHandlers) {
                try {
                    handler.get();
                }
                catch (WriteTimeoutException e) {
                    logger.info("Timed out replaying hints to {}; aborting ({} delivered)", (Object)endpoint, (Object)rowsReplayed);
                    return;
                }
            }
        }
        logger.info("Finished hinted handoff of {} rows to endpoint {}", (Object)rowsReplayed, (Object)endpoint);
        try {
            this.compact().get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private int calculatePageSize() {
        if (this.hintStore.getMeanColumns() > 0) {
            int averageColumnSize = (int)(this.hintStore.getMeanRowSize() / (long)this.hintStore.getMeanColumns());
            return Math.max(2, Math.min(128, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize));
        }
        return 128;
    }

    private void scheduleAllDeliveries() {
        if (logger.isDebugEnabled()) {
            logger.debug("Started scheduleAllDeliveries");
        }
        IPartitioner p = StorageService.getPartitioner();
        Token.KeyBound minPos = ((Token)p.getMinimumToken()).minKeyBound();
        Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
        NamesQueryFilter filter = new NamesQueryFilter((SortedSet<ByteBuffer>)ImmutableSortedSet.of());
        List<Row> rows = this.hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
        for (Row row : rows) {
            UUID hostId = UUIDGen.getUUID(row.key.key);
            InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
            if (target == null) continue;
            this.scheduleHintDelivery(target);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Finished scheduleAllDeliveries");
        }
    }

    public void scheduleHintDelivery(final InetAddress to) {
        if (!this.queuedDeliveries.add((Object)to)) {
            return;
        }
        logger.debug("Scheduling delivery of Hints to {}", (Object)to);
        this.executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    HintedHandOffManager.this.deliverHintsToEndpoint(to);
                }
                finally {
                    HintedHandOffManager.this.queuedDeliveries.remove((Object)to);
                }
            }
        });
    }

    @Override
    public void scheduleHintDelivery(String to) throws UnknownHostException {
        this.scheduleHintDelivery(InetAddress.getByName(to));
    }

    @Override
    public void pauseHintsDelivery(boolean b) {
        this.hintedHandOffPaused = b;
    }

    @Override
    public List<String> listEndpointsPendingHints() {
        Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
        LinkedList<String> result = new LinkedList<String>();
        for (Row row : this.getHintsSlice(1)) {
            if (row.cf == null) continue;
            result.addFirst(tokenFactory.toString(row.key.token));
        }
        return result;
    }

    private List<Row> getHintsSlice(int columnCount) {
        SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, columnCount);
        IPartitioner partitioner = StorageService.getPartitioner();
        Token.KeyBound minPos = ((Token)partitioner.getMinimumToken()).minKeyBound();
        Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
        try {
            RangeSliceCommand cmd = new RangeSliceCommand("system", "hints", System.currentTimeMillis(), predicate, range, null, 65536);
            return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
        }
        catch (Exception e) {
            logger.info("HintsCF getEPPendingHints timed out.");
            throw new RuntimeException(e);
        }
    }
}

