/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.DebuggableTask;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.MessageParams;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.RejectException;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.TruncateRequest;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.IsBootstrappingException;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.exceptions.ReadAbortException;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.CASClientRequestMetrics;
import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
import org.apache.cassandra.metrics.DenylistMetrics;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.net.ForwardingInfo;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.PartitionDenylist;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.BatchlogResponseHandler;
import org.apache.cassandra.service.CASRequest;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxyMBean;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.TruncateResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.ContentionStrategy;
import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.v1.PrepareCallback;
import org.apache.cassandra.service.paxos.v1.PrepareVerbHandler;
import org.apache.cassandra.service.paxos.v1.ProposeCallback;
import org.apache.cassandra.service.paxos.v1.ProposeVerbHandler;
import org.apache.cassandra.service.reads.AbstractReadExecutor;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.service.reads.range.RangeCommands;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageProxy
implements StorageProxyMBean {
    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
    private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
    public static final String UNREACHABLE = "UNREACHABLE";
    private static final int FAILURE_LOGGING_INTERVAL_SECONDS = CassandraRelevantProperties.FAILURE_LOGGING_INTERVAL_SECONDS.getInt();
    private static final WritePerformer standardWritePerformer;
    private static final WritePerformer counterWritePerformer;
    private static final WritePerformer counterWriteOnCoordinatorPerformer;
    public static final StorageProxy instance;
    private static volatile int maxHintsInProgress;
    private static final CacheLoader<InetAddressAndPort, AtomicInteger> hintsInProgress;
    private static final DenylistMetrics denylistMetrics;
    private static final PartitionDenylist partitionDenylist;
    private volatile long logBlockingReadRepairAttemptsUntilNanos = Long.MIN_VALUE;

    private StorageProxy() {
    }

    public static RowIterator cas(String keyspaceName, String cfName, DecoratedKey key, CASRequest request, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, ClientState clientState, long nowInSeconds, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException, CasWriteUnknownResultException {
        if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistWritesEnabled() && !partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey())) {
            denylistMetrics.incrementWritesRejected();
            throw new InvalidRequestException(String.format("Unable to CAS write to denylisted partition [0x%s] in %s/%s", key.toString(), keyspaceName, cfName));
        }
        return Paxos.useV2() ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState) : StorageProxy.legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
    }

    public static RowIterator legacyCas(String keyspaceName, String cfName, DecoratedKey key, CASRequest request, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, ClientState clientState, long nowInSeconds, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException {
        long startTimeForMetrics = Clock.Global.nanoTime();
        try {
            TableMetadata metadata = Schema.instance.validateTable(keyspaceName, cfName);
            Function<Ballot, Pair<PartitionUpdate, RowIterator>> updateProposer = ballot -> {
                FilteredPartition current;
                Tracing.trace("Reading existing values for CAS precondition");
                SinglePartitionReadCommand readCommand = request.readCommand(nowInSeconds);
                ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
                try (RowIterator rowIter = StorageProxy.readOne(readCommand, readConsistency, queryStartNanoTime);){
                    current = FilteredPartition.create(rowIter);
                }
                if (!request.appliesTo(current)) {
                    Tracing.trace("CAS precondition does not match current values {}", (Object)current);
                    ClientRequestsMetricsHolder.casWriteMetrics.conditionNotMet.inc();
                    return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator());
                }
                PartitionUpdate updates = request.makeUpdates(current, clientState, (Ballot)ballot);
                ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
                long size = updates.dataSize();
                ClientRequestsMetricsHolder.casWriteMetrics.mutationSize.update(size);
                ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyForPaxos).mutationSize.update(size);
                updates = TriggerExecutor.instance.execute(updates);
                return Pair.create(updates, null);
            };
            RowIterator rowIterator = StorageProxy.doPaxos(metadata, key, consistencyForPaxos, consistencyForCommit, consistencyForCommit, queryStartNanoTime, ClientRequestsMetricsHolder.casWriteMetrics, updateProposer);
            return rowIterator;
        }
        catch (CasWriteUnknownResultException e) {
            ClientRequestsMetricsHolder.casWriteMetrics.unknownResult.mark();
            throw e;
        }
        catch (CasWriteTimeoutException wte) {
            ClientRequestsMetricsHolder.casWriteMetrics.timeouts.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyForPaxos).timeouts.mark();
            throw new CasWriteTimeoutException(wte.writeType, wte.consistency, wte.received, wte.blockFor, wte.contentions);
        }
        catch (ReadTimeoutException e) {
            ClientRequestsMetricsHolder.casWriteMetrics.timeouts.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyForPaxos).timeouts.mark();
            throw e;
        }
        catch (ReadAbortException e) {
            ClientRequestsMetricsHolder.casWriteMetrics.markAbort(e);
            ClientRequestsMetricsHolder.writeMetricsForLevel(consistencyForPaxos).markAbort(e);
            throw e;
        }
        catch (ReadFailureException | WriteFailureException e) {
            ClientRequestsMetricsHolder.casWriteMetrics.failures.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyForPaxos).failures.mark();
            throw e;
        }
        catch (UnavailableException e) {
            ClientRequestsMetricsHolder.casWriteMetrics.unavailables.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyForPaxos).unavailables.mark();
            throw e;
        }
        finally {
            long latency = Clock.Global.nanoTime() - startTimeForMetrics;
            ClientRequestsMetricsHolder.casWriteMetrics.addNano(latency);
            ClientRequestsMetricsHolder.writeMetricsForLevel(consistencyForPaxos).addNano(latency);
        }
    }

    private static void recordCasContention(TableMetadata table, DecoratedKey key, CASClientRequestMetrics casMetrics, int contentions) {
        if (contentions == 0) {
            return;
        }
        casMetrics.contention.update(contentions);
        Keyspace.open((String)table.keyspace).getColumnFamilyStore((String)table.name).metric.topCasPartitionContention.addSample(key.getKey(), contentions);
    }

    private static RowIterator doPaxos(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForReplayCommits, ConsistencyLevel consistencyForCommit, long queryStartNanoTime, CASClientRequestMetrics casMetrics, Function<Ballot, Pair<PartitionUpdate, RowIterator>> createUpdateProposal) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException {
        int contentions = 0;
        Keyspace keyspace = Keyspace.open(metadata.keyspace);
        AbstractReplicationStrategy latestRs = keyspace.getReplicationStrategy();
        try {
            consistencyForPaxos.validateForCas();
            consistencyForReplayCommits.validateForCasCommit(latestRs);
            consistencyForCommit.validateForCasCommit(latestRs);
            long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(TimeUnit.NANOSECONDS);
            while (Clock.Global.nanoTime() - queryStartNanoTime < timeoutNanos) {
                ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos);
                latestRs = replicaPlan.replicationStrategy();
                PaxosBallotAndContention pair = StorageProxy.beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForReplayCommits, casMetrics);
                Ballot ballot = pair.ballot;
                contentions += pair.contentions;
                Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.apply(ballot);
                if (proposalPair == null) {
                    RowIterator rowIterator = null;
                    return rowIterator;
                }
                Commit proposal = Commit.newProposal(ballot, (PartitionUpdate)proposalPair.left);
                Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", (Object)ballot);
                if (StorageProxy.proposePaxos(proposal, replicaPlan, true, queryStartNanoTime)) {
                    RowIterator result;
                    if (!proposal.update.isEmpty()) {
                        StorageProxy.commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
                    }
                    if ((result = (RowIterator)proposalPair.right) != null) {
                        Tracing.trace("CAS did not apply");
                    } else {
                        Tracing.trace("CAS applied successfully");
                    }
                    RowIterator rowIterator = result;
                    return rowIterator;
                }
                Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
                ++contentions;
                Uninterruptibles.sleepUninterruptibly((long)ThreadLocalRandom.current().nextInt(100), (TimeUnit)TimeUnit.MILLISECONDS);
            }
        }
        catch (CasWriteTimeoutException e) {
            contentions += e.contentions;
            throw e;
        }
        catch (WriteTimeoutException e) {
            throw new CasWriteTimeoutException(e.writeType, e.consistency, e.received, e.blockFor, contentions);
        }
        finally {
            StorageProxy.recordCasContention(metadata, key, casMetrics, contentions);
        }
        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(latestRs), contentions);
    }

    private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, TableMetadata metadata, ReplicaPlan.ForPaxosWrite paxosPlan, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, CASClientRequestMetrics casMetrics) throws WriteTimeoutException, WriteFailureException {
        long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(TimeUnit.NANOSECONDS);
        PrepareCallback summary = null;
        int contentions = 0;
        while (Clock.Global.nanoTime() - queryStartNanoTime < timeoutNanos) {
            long minTimestampMicrosToUse = summary == null ? Long.MIN_VALUE : 1L + summary.mostRecentInProgressCommit.ballot.unixMicros();
            Ballot ballot = BallotGenerator.Global.nextBallot(minTimestampMicrosToUse, consistencyForPaxos == ConsistencyLevel.SERIAL ? Ballot.Flag.GLOBAL : Ballot.Flag.LOCAL);
            try {
                Tracing.trace("Preparing {}", (Object)ballot);
                Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
                summary = StorageProxy.preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
                if (!summary.promised) {
                    Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
                    ++contentions;
                    Uninterruptibles.sleepUninterruptibly((long)ThreadLocalRandom.current().nextInt(100), (TimeUnit)TimeUnit.MILLISECONDS);
                    continue;
                }
                Commit inProgress = summary.mostRecentInProgressCommit;
                Commit mostRecent = summary.mostRecentCommit;
                if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) {
                    Tracing.trace("Finishing incomplete paxos round {}", (Object)inProgress);
                    casMetrics.unfinishedCommit.inc();
                    Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
                    if (StorageProxy.proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime)) {
                        StorageProxy.commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime);
                        continue;
                    }
                    Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
                    ++contentions;
                    Uninterruptibles.sleepUninterruptibly((long)ThreadLocalRandom.current().nextInt(100), (TimeUnit)TimeUnit.MILLISECONDS);
                    continue;
                }
                Iterable<InetAddressAndPort> missingMRC = summary.replicasMissingMostRecentCommit();
                if (Iterables.size(missingMRC) > 0) {
                    Tracing.trace("Repairing replicas that missed the most recent commit");
                    StorageProxy.sendCommit(mostRecent, missingMRC);
                    continue;
                }
                return new PaxosBallotAndContention(ballot, contentions);
            }
            catch (WriteTimeoutException e) {
                throw new CasWriteTimeoutException(WriteType.CAS, e.consistency, e.received, e.blockFor, contentions);
            }
        }
        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(paxosPlan.replicationStrategy()), contentions);
    }

    private static void sendCommit(Commit commit, Iterable<InetAddressAndPort> replicas) {
        Message<Commit> message = Message.out(Verb.PAXOS_COMMIT_REQ, commit);
        for (InetAddressAndPort target : replicas) {
            MessagingService.instance().send(message, target);
        }
    }

    private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime) throws WriteTimeoutException {
        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
        Message<Commit> message = Message.out(Verb.PAXOS_PREPARE_REQ, toPrepare);
        boolean hasLocalRequest = false;
        for (Replica replica : (EndpointsForToken)replicaPlan.contacts()) {
            if (replica.isSelf()) {
                hasLocalRequest = true;
                Verb.PAXOS_PREPARE_REQ.stage.execute(() -> {
                    try {
                        callback.onResponse(message.responseWith((Commit)((Object)PrepareVerbHandler.doPrepare(toPrepare))));
                    }
                    catch (Exception ex) {
                        logger.error("Failed paxos prepare locally", (Throwable)ex);
                    }
                });
                continue;
            }
            MessagingService.instance().sendWithCallback(message, replica.endpoint(), (RequestCallback)callback);
        }
        if (hasLocalRequest) {
            ClientRequestsMetricsHolder.writeMetrics.localRequests.mark();
        } else {
            ClientRequestsMetricsHolder.writeMetrics.remoteRequests.mark();
        }
        callback.await();
        return callback;
    }

    private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean backoffIfPartial, long queryStartNanoTime) throws WriteTimeoutException, CasWriteUnknownResultException {
        ProposeCallback callback = new ProposeCallback(((EndpointsForToken)replicaPlan.contacts()).size(), replicaPlan.requiredParticipants(), !backoffIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime);
        Message<Commit> message = Message.out(Verb.PAXOS_PROPOSE_REQ, proposal);
        for (Replica replica : (EndpointsForToken)replicaPlan.contacts()) {
            if (replica.isSelf()) {
                Verb.PAXOS_PROPOSE_REQ.stage.execute(() -> {
                    try {
                        Message<Boolean> response = message.responseWith((Commit)((Object)ProposeVerbHandler.doPropose(proposal)));
                        callback.onResponse(response);
                    }
                    catch (Exception ex) {
                        logger.error("Failed paxos propose locally", (Throwable)ex);
                    }
                });
                continue;
            }
            MessagingService.instance().sendWithCallback(message, replica.endpoint(), (RequestCallback)callback);
        }
        callback.await();
        if (callback.isSuccessful()) {
            return true;
        }
        if (backoffIfPartial && !callback.isFullyRefused()) {
            throw new CasWriteUnknownResultException(replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants());
        }
        return false;
    }

    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean allowHints, long queryStartNanoTime) throws WriteTimeoutException {
        boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
        Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace);
        Token tk = proposal.update.partitionKey().getToken();
        AbstractWriteResponseHandler responseHandler = null;
        ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
        if (shouldBlock) {
            AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
            responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, proposal::makeMutation, queryStartNanoTime);
        }
        Message<Commit> message = Message.outWithFlag(Verb.PAXOS_COMMIT_REQ, proposal, MessageFlag.CALL_BACK_ON_FAILURE);
        for (Replica replica : replicaPlan.liveAndDown()) {
            InetAddressAndPort destination = replica.endpoint();
            StorageProxy.checkHintOverload(replica);
            if (replicaPlan.isAlive(replica)) {
                if (shouldBlock) {
                    if (replica.isSelf()) {
                        StorageProxy.commitPaxosLocal(replica, message, responseHandler);
                        continue;
                    }
                    MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler);
                    continue;
                }
                MessagingService.instance().send(message, destination);
                continue;
            }
            if (responseHandler != null) {
                responseHandler.expired();
            }
            if (!allowHints || !StorageProxy.shouldHint(replica)) continue;
            StorageProxy.submitHint(proposal.makeMutation(), replica, null);
        }
        if (shouldBlock) {
            responseHandler.get();
        }
    }

    private static void commitPaxosLocal(Replica localReplica, final Message<Commit> message, final AbstractWriteResponseHandler<?> responseHandler) {
        Verb.PAXOS_COMMIT_REQ.stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica){

            @Override
            public void runMayThrow() {
                try {
                    PaxosState.commitDirect((Commit)message.payload);
                    if (responseHandler != null) {
                        responseHandler.onResponse(null);
                    }
                }
                catch (Exception ex) {
                    if (!(ex instanceof WriteTimeoutException)) {
                        logger.error("Failed to apply paxos commit locally : ", (Throwable)ex);
                    }
                    responseHandler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(ex));
                }
            }

            @Override
            public String description() {
                return "Paxos " + ((Commit)message.payload).toString();
            }

            @Override
            protected Verb verb() {
                return Verb.PAXOS_COMMIT_REQ;
            }
        });
    }

    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException {
        block15: {
            Tracing.trace("Determining replicas for mutation");
            String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
            long startTime = Clock.Global.nanoTime();
            ArrayList<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<AbstractWriteResponseHandler<IMutation>>(mutations.size());
            WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH;
            try {
                for (IMutation iMutation : mutations) {
                    if (iMutation instanceof CounterMutation) {
                        responseHandlers.add(StorageProxy.mutateCounter((CounterMutation)iMutation, localDataCenter, queryStartNanoTime));
                        continue;
                    }
                    responseHandlers.add(StorageProxy.performWrite(iMutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
                }
                for (int i = 0; i < mutations.size(); ++i) {
                    if (mutations.get(i) instanceof CounterMutation) continue;
                    ((AbstractWriteResponseHandler)responseHandlers.get(i)).maybeTryAdditionalReplicas(mutations.get(i), standardWritePerformer, localDataCenter);
                }
                for (AbstractWriteResponseHandler abstractWriteResponseHandler : responseHandlers) {
                    abstractWriteResponseHandler.get();
                }
            }
            catch (WriteFailureException | WriteTimeoutException ex) {
                if (consistencyLevel == ConsistencyLevel.ANY) {
                    StorageProxy.hintMutations(mutations);
                    break block15;
                }
                if (ex instanceof WriteFailureException) {
                    ClientRequestsMetricsHolder.writeMetrics.failures.mark();
                    ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyLevel).failures.mark();
                    WriteFailureException writeFailureException = (WriteFailureException)ex;
                    Tracing.trace("Write failure; received {} of {} required replies, failed {} requests", writeFailureException.received, writeFailureException.blockFor, writeFailureException.failureReasonByEndpoint.size());
                } else {
                    ClientRequestsMetricsHolder.writeMetrics.timeouts.mark();
                    ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyLevel).timeouts.mark();
                    WriteTimeoutException writeTimeoutException = (WriteTimeoutException)ex;
                    Tracing.trace("Write timeout; received {} of {} required replies", (Object)writeTimeoutException.received, (Object)writeTimeoutException.blockFor);
                }
                throw ex;
            }
            catch (UnavailableException e) {
                ClientRequestsMetricsHolder.writeMetrics.unavailables.mark();
                ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyLevel).unavailables.mark();
                Tracing.trace("Unavailable");
                throw e;
            }
            catch (OverloadedException e) {
                ClientRequestsMetricsHolder.writeMetrics.unavailables.mark();
                ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyLevel).unavailables.mark();
                Tracing.trace("Overloaded");
                throw e;
            }
            finally {
                long latency = Clock.Global.nanoTime() - startTime;
                ClientRequestsMetricsHolder.writeMetrics.addNano(latency);
                ClientRequestsMetricsHolder.writeMetricsForLevel(consistencyLevel).addNano(latency);
                StorageProxy.updateCoordinatorWriteLatencyTableMetric(mutations, latency);
            }
        }
    }

    private static void hintMutations(Collection<? extends IMutation> mutations) {
        for (IMutation iMutation : mutations) {
            if (iMutation instanceof CounterMutation) continue;
            StorageProxy.hintMutation((Mutation)iMutation);
        }
        Tracing.trace("Wrote hints to satisfy CL.ANY after no replicas acknowledged the write");
    }

    private static void hintMutation(Mutation mutation) {
        String keyspaceName = mutation.getKeyspaceName();
        Token token = mutation.key().getToken();
        EndpointsForToken replicasToHint = (EndpointsForToken)((EndpointsForToken)ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all()).filter(StorageProxy::shouldHint);
        StorageProxy.submitHint(mutation, replicasToHint, null);
    }

    public boolean appliesLocally(Mutation mutation) {
        String keyspaceName = mutation.getKeyspaceName();
        Token token = mutation.key().getToken();
        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
        return ((EndpointsForToken)ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all()).endpoints().contains(local);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException {
        Tracing.trace("Determining replicas for mutation");
        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
        long startTime = Clock.Global.nanoTime();
        try {
            TimeUUID batchUUID = TimeUUID.Generator.nextTimeUUID();
            if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving()) {
                BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), mutations), writeCommitLog);
            } else {
                ArrayList<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
                HashSet<Mutation> nonLocalMutations = new HashSet<Mutation>(mutations);
                Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
                ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
                ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite();
                BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> StorageProxy.asyncRemoveFromBatchlog(replicaPlan, batchUUID));
                for (Mutation mutation : mutations) {
                    String keyspaceName = mutation.getKeyspaceName();
                    Token tk = mutation.key().getToken();
                    AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
                    Optional<Replica> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(replicationStrategy, baseToken, tk);
                    EndpointsForToken pendingReplicas = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspaceName);
                    if (!pairedEndpoint.isPresent()) {
                        if (!pendingReplicas.isEmpty()) continue;
                        logger.warn("Received base materialized view mutation for key {} that does not belong to this node. There is probably a range movement happening (move or decommission),but this node hasn't updated its ring metadata yet. Adding mutation to local batchlog to be replayed later.", (Object)mutation.key());
                        continue;
                    }
                    if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined() && pendingReplicas.isEmpty()) {
                        try {
                            mutation.apply(writeCommitLog);
                            nonLocalMutations.remove(mutation);
                            cleanup.decrement();
                            continue;
                        }
                        catch (Exception exc) {
                            logger.error("Error applying local view update: Mutation (keyspace {}, tables {}, partition key {})", new Object[]{mutation.getKeyspaceName(), mutation.getTableIds(), mutation.key()});
                            throw exc;
                        }
                    }
                    ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(replicationStrategy, EndpointsForToken.of(tk, pairedEndpoint.get()), pendingReplicas);
                    wrappers.add(StorageProxy.wrapViewBatchResponseHandler(mutation, consistencyLevel, consistencyLevel, liveAndDown, baseComplete, WriteType.BATCH, cleanup, queryStartNanoTime));
                }
                if (!nonLocalMutations.isEmpty()) {
                    BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonLocalMutations), writeCommitLog);
                }
                if (!wrappers.isEmpty()) {
                    StorageProxy.asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
                }
            }
        }
        finally {
            ClientRequestsMetricsHolder.viewWriteMetrics.addNano(Clock.Global.nanoTime() - startTime);
        }
    }

    public static void mutateWithTriggers(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException {
        if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistWritesEnabled()) {
            for (IMutation iMutation : mutations) {
                for (TableId tid : iMutation.getTableIds()) {
                    if (partitionDenylist.isKeyPermitted(tid, iMutation.key().getKey())) continue;
                    denylistMetrics.incrementWritesRejected();
                    TableMetadata tmd = Schema.instance.getTableMetadata(tid);
                    throw new InvalidRequestException(String.format("Unable to write to denylisted partition [0x%s] in %s/%s", iMutation.key().toString(), tmd.keyspace, tmd.name));
                }
            }
        }
        Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
        boolean bl = Keyspace.open((String)mutations.iterator().next().getKeyspaceName()).viewManager.updatesAffectView(mutations, true);
        long size = IMutation.dataSize(mutations);
        ClientRequestsMetricsHolder.writeMetrics.mutationSize.update(size);
        ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistencyLevel).mutationSize.update(size);
        if (augmented != null) {
            StorageProxy.mutateAtomically(augmented, consistencyLevel, bl, queryStartNanoTime);
        } else if (mutateAtomically || bl) {
            StorageProxy.mutateAtomically(mutations, consistencyLevel, bl, queryStartNanoTime);
        } else {
            StorageProxy.mutate(mutations, consistencyLevel, queryStartNanoTime);
        }
    }

    public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level, boolean requireQuorumForRemove, long queryStartNanoTime) throws UnavailableException, OverloadedException, WriteTimeoutException {
        Tracing.trace("Determining replicas for atomic batch");
        long startTime = Clock.Global.nanoTime();
        ArrayList<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
        if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas())) {
            throw new AssertionError((Object)"Logged batches are unsupported with transient replication");
        }
        try {
            ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove ? ConsistencyLevel.QUORUM : consistency_level;
            switch (consistency_level) {
                case ALL: 
                case EACH_QUORUM: {
                    batchConsistencyLevel = consistency_level;
                }
            }
            ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY);
            TimeUUID batchUUID = TimeUUID.Generator.nextTimeUUID();
            BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> StorageProxy.asyncRemoveFromBatchlog(replicaPlan, batchUUID));
            for (Mutation mutation2 : mutations) {
                WriteResponseHandlerWrapper wrapper = StorageProxy.wrapBatchResponseHandler(mutation2, consistency_level, batchConsistencyLevel, WriteType.BATCH, cleanup, queryStartNanoTime);
                wrappers.add(wrapper);
            }
            StorageProxy.syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime);
            StorageProxy.syncWriteBatchedMutations(wrappers, Stage.MUTATION);
        }
        catch (UnavailableException e) {
            ClientRequestsMetricsHolder.writeMetrics.unavailables.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistency_level).unavailables.mark();
            Tracing.trace("Unavailable");
            throw e;
        }
        catch (WriteTimeoutException e) {
            ClientRequestsMetricsHolder.writeMetrics.timeouts.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistency_level).timeouts.mark();
            Tracing.trace("Write timeout; received {} of {} required replies", (Object)e.received, (Object)e.blockFor);
            throw e;
        }
        catch (WriteFailureException e) {
            ClientRequestsMetricsHolder.writeMetrics.failures.mark();
            ClientRequestsMetricsHolder.writeMetricsForLevel((ConsistencyLevel)consistency_level).failures.mark();
            Tracing.trace("Write failure; received {} of {} required replies", (Object)e.received, (Object)e.blockFor);
            throw e;
        }
        finally {
            long latency = Clock.Global.nanoTime() - startTime;
            ClientRequestsMetricsHolder.writeMetrics.addNano(latency);
            ClientRequestsMetricsHolder.writeMetricsForLevel(consistency_level).addNano(latency);
            StorageProxy.updateCoordinatorWriteLatencyTableMetric(mutations, latency);
        }
    }

    private static void updateCoordinatorWriteLatencyTableMetric(Collection<? extends IMutation> mutations, long latency) {
        if (null == mutations) {
            return;
        }
        try {
            mutations.stream().flatMap(m -> m.getTableIds().stream().map(tableId -> Keyspace.open(m.getKeyspaceName()).getColumnFamilyStore((TableId)tableId))).distinct().forEach(store -> store.metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS));
        }
        catch (Exception ex) {
            logger.warn("Exception occurred updating coordinatorWriteLatency metric", (Throwable)ex);
        }
    }

    private static void syncWriteToBatchlog(Collection<Mutation> mutations, ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException {
        WriteResponseHandler handler = new WriteResponseHandler(replicaPlan, WriteType.BATCH_LOG, null, queryStartNanoTime);
        Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
        Message<Batch> message = Message.out(Verb.BATCH_STORE_REQ, batch);
        for (Replica replica : replicaPlan.liveAndDown()) {
            logger.trace("Sending batchlog store request {} to {} for {} mutations", new Object[]{batch.id, replica, batch.size()});
            if (replica.isSelf()) {
                StorageProxy.performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler, "Batchlog store");
                continue;
            }
            MessagingService.instance().sendWithCallback(message, replica.endpoint(), (RequestCallback)handler);
        }
        handler.get();
    }

    private static void asyncRemoveFromBatchlog(ReplicaPlan.ForWrite replicaPlan, TimeUUID uuid) {
        Message<TimeUUID> message = Message.out(Verb.BATCH_REMOVE_REQ, uuid);
        for (Replica target : (EndpointsForToken)replicaPlan.contacts()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Sending batchlog remove request {} to {}", (Object)uuid, (Object)target);
            }
            if (target.isSelf()) {
                StorageProxy.performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove");
                continue;
            }
            MessagingService.instance().send(message, target.endpoint());
        }
    }

    private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) {
        for (WriteResponseHandlerWrapper wrapper : wrappers) {
            Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown());
            ReplicaPlan.ForWrite replicas = wrapper.handler.replicaPlan.withContacts(wrapper.handler.replicaPlan.liveAndDown());
            try {
                StorageProxy.sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage);
            }
            catch (OverloadedException | WriteTimeoutException e) {
                wrapper.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(e));
            }
        }
    }

    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, Stage stage) throws WriteTimeoutException, OverloadedException {
        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
        for (WriteResponseHandlerWrapper wrapper : wrappers) {
            EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
            Replicas.temporaryAssertFull(sendTo);
            StorageProxy.sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContacts(sendTo), wrapper.handler, localDataCenter, stage);
        }
        for (WriteResponseHandlerWrapper wrapper : wrappers) {
            wrapper.handler.get();
        }
    }

    public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation, ConsistencyLevel consistencyLevel, String localDataCenter, WritePerformer performer, Runnable callback, WriteType writeType, long queryStartNanoTime) {
        Token tk;
        String keyspaceName = mutation.getKeyspaceName();
        Keyspace keyspace = Keyspace.open(keyspaceName);
        ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk = mutation.key().getToken(), ReplicaPlans.writeNormal);
        if (replicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null) {
            ClientRequestsMetricsHolder.writeMetrics.localRequests.mark();
        } else {
            ClientRequestsMetricsHolder.writeMetrics.remoteRequests.mark();
        }
        AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, mutation.hintOnFailure(), queryStartNanoTime);
        performer.apply(mutation, replicaPlan, responseHandler, localDataCenter);
        return responseHandler;
    }

    private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation, ConsistencyLevel consistencyLevel, ConsistencyLevel batchConsistencyLevel, WriteType writeType, BatchlogResponseHandler.BatchlogCleanup cleanup, long queryStartNanoTime) {
        Token tk;
        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
        ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk = mutation.key().getToken(), ReplicaPlans.writeNormal);
        if (replicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null) {
            ClientRequestsMetricsHolder.writeMetrics.localRequests.mark();
        } else {
            ClientRequestsMetricsHolder.writeMetrics.remoteRequests.mark();
        }
        AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
        AbstractWriteResponseHandler writeHandler = rs.getWriteResponseHandler(replicaPlan, null, writeType, mutation, queryStartNanoTime);
        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<IMutation>(writeHandler, batchConsistencyLevel.blockFor(rs), cleanup, queryStartNanoTime);
        return new WriteResponseHandlerWrapper(batchHandler, mutation);
    }

    private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation, ConsistencyLevel consistencyLevel, ConsistencyLevel batchConsistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, AtomicLong baseComplete, WriteType writeType, BatchlogResponseHandler.BatchlogCleanup cleanup, long queryStartNanoTime) {
        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
        ReplicaPlan.ForWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll);
        AbstractReplicationStrategy replicationStrategy = replicaPlan.replicationStrategy();
        AbstractWriteResponseHandler<IMutation> writeHandler = replicationStrategy.getWriteResponseHandler(replicaPlan, () -> {
            long delay = Math.max(0L, Clock.Global.currentTimeMillis() - baseComplete.get());
            ClientRequestsMetricsHolder.viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
        }, writeType, mutation, queryStartNanoTime);
        ViewWriteMetricsWrapped batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(replicationStrategy), cleanup, queryStartNanoTime);
        return new WriteResponseHandlerWrapper(batchHandler, mutation);
    }

    public static void sendToHintedReplicas(Mutation mutation, ReplicaPlan.ForWrite plan, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, Stage stage) throws OverloadedException {
        ArrayList<Replica> localDc = null;
        HashMap<String, Collection> dcGroups = null;
        Message<Mutation> message = null;
        boolean insertLocal = false;
        Replica localReplica = null;
        ArrayList<Replica> endpointsToHint = null;
        ArrayList<InetAddressAndPort> backPressureHosts = null;
        Mutation.serializer.prepareSerializedBuffer(mutation, MessagingService.current_version);
        for (Replica destination : (EndpointsForToken)plan.contacts()) {
            StorageProxy.checkHintOverload(destination);
            if (plan.isAlive(destination)) {
                String dc;
                if (destination.isSelf()) {
                    insertLocal = true;
                    localReplica = destination;
                    continue;
                }
                if (message == null) {
                    message = Message.outWithFlag(Verb.MUTATION_REQ, mutation, MessageFlag.CALL_BACK_ON_FAILURE);
                }
                if (localDataCenter.equals(dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination))) {
                    if (localDc == null) {
                        localDc = new ArrayList<Replica>(((EndpointsForToken)plan.contacts()).size());
                    }
                    localDc.add(destination);
                } else {
                    Collection messages;
                    if (dcGroups == null) {
                        dcGroups = new HashMap<String, Collection>();
                    }
                    if ((messages = (Collection)dcGroups.get(dc)) == null) {
                        messages = dcGroups.computeIfAbsent(dc, v -> new ArrayList(3));
                    }
                    messages.add(destination);
                }
                if (backPressureHosts == null) {
                    backPressureHosts = new ArrayList<InetAddressAndPort>(((EndpointsForToken)plan.contacts()).size());
                }
                backPressureHosts.add(destination.endpoint());
                continue;
            }
            responseHandler.expired();
            if (!StorageProxy.shouldHint(destination)) continue;
            if (endpointsToHint == null) {
                endpointsToHint = new ArrayList<Replica>();
            }
            endpointsToHint.add(destination);
        }
        if (endpointsToHint != null) {
            StorageProxy.submitHint(mutation, EndpointsForToken.copyOf(mutation.key().getToken(), endpointsToHint), responseHandler);
        }
        if (insertLocal) {
            Preconditions.checkNotNull(localReplica);
            StorageProxy.performLocally(stage, localReplica, mutation::apply, responseHandler, mutation);
        }
        if (localDc != null) {
            for (Replica destination : localDc) {
                MessagingService.instance().sendWriteWithCallback(message, destination, responseHandler);
            }
        }
        if (dcGroups != null) {
            for (Collection dcTargets : dcGroups.values()) {
                StorageProxy.sendMessagesToNonlocalDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcTargets), responseHandler);
            }
        }
    }

    private static void checkHintOverload(Replica destination) {
        if (StorageMetrics.totalHintsInProgress.getCount() > (long)maxHintsInProgress && StorageProxy.getHintsInProgressFor(destination.endpoint()).get() > 0 && StorageProxy.shouldHint(destination)) {
            throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() + " destination: " + destination + " destination hints: " + StorageProxy.getHintsInProgressFor(destination.endpoint()).get());
        }
    }

    private static void sendMessagesToNonlocalDC(Message<? extends IMutation> message, EndpointsForToken targets, AbstractWriteResponseHandler<IMutation> handler) {
        Replica target;
        if (targets.size() > 1) {
            target = StorageProxy.pickReplica(targets);
            EndpointsForToken forwardToReplicas = (EndpointsForToken)targets.filter(r -> r != target, targets.size());
            for (Replica replica : forwardToReplicas) {
                MessagingService.instance().callbacks.addWithExpiration(handler, message, replica);
                logger.trace("Adding FWD message to {}@{}", (Object)message.id(), (Object)replica);
            }
            long[] messageIds = new long[forwardToReplicas.size()];
            Arrays.fill(messageIds, message.id());
            message = message.withForwardTo(new ForwardingInfo(forwardToReplicas.endpointList(), messageIds));
        } else {
            target = targets.get(0);
        }
        Tracing.trace("Sending mutation to remote replica {}", (Object)target);
        MessagingService.instance().sendWriteWithCallback(message, target, handler);
        logger.trace("Sending message to {}@{}", (Object)message.id(), (Object)target);
    }

    private static Replica pickReplica(EndpointsForToken targets) {
        EndpointsForToken healthy = (EndpointsForToken)targets.filter(r -> DynamicEndpointSnitch.getSeverity(r.endpoint()) == 0.0);
        EndpointsForToken select = healthy.isEmpty() ? targets : healthy;
        return select.get(ThreadLocalRandom.current().nextInt(0, select.size()));
    }

    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final String description) {
        stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica){

            @Override
            public void runMayThrow() {
                try {
                    runnable.run();
                }
                catch (Exception ex) {
                    logger.error("Failed to apply mutation locally : ", (Throwable)ex);
                }
            }

            @Override
            public String description() {
                return description;
            }

            @Override
            protected Verb verb() {
                return Verb.MUTATION_REQ;
            }
        });
    }

    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler, final Object description) {
        stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica){

            @Override
            public void runMayThrow() {
                try {
                    runnable.run();
                    handler.onResponse(null);
                }
                catch (Exception ex) {
                    if (!(ex instanceof WriteTimeoutException)) {
                        logger.error("Failed to apply mutation locally : ", (Throwable)ex);
                    }
                    handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(ex));
                }
            }

            @Override
            public String description() {
                return description.toString();
            }

            @Override
            protected Verb verb() {
                return Verb.MUTATION_REQ;
            }
        });
    }

    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException {
        Replica replica = StorageProxy.findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
        if (replica.isSelf()) {
            return StorageProxy.applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
        }
        String keyspaceName = cm.getKeyspaceName();
        Keyspace keyspace = Keyspace.open(keyspaceName);
        Token tk = cm.key().getToken();
        ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll);
        ClientRequestsMetricsHolder.writeMetrics.remoteRequests.mark();
        WriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<IMutation>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica), WriteType.COUNTER, null, queryStartNanoTime);
        Tracing.trace("Enqueuing counter update to {}", (Object)replica);
        Message<CounterMutation> message = Message.outWithFlag(Verb.COUNTER_MUTATION_REQ, cm, MessageFlag.CALL_BACK_ON_FAILURE);
        MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler);
        return responseHandler;
    }

    private static Replica findSuitableReplica(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException {
        Keyspace keyspace = Keyspace.open(keyspaceName);
        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
        AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();
        EndpointsForToken replicas = replicationStrategy.getNaturalReplicasForToken(key);
        replicas = (EndpointsForToken)replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
        if ((replicas = (EndpointsForToken)replicas.filter(replica -> FailureDetector.instance.isAlive(replica.endpoint()))).isEmpty()) {
            throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0);
        }
        ArrayList<Replica> localReplicas = new ArrayList<Replica>(replicas.size());
        for (Replica replica2 : replicas) {
            if (!snitch.getDatacenter(replica2).equals(localDataCenter)) continue;
            localReplicas.add(replica2);
        }
        if (localReplicas.isEmpty()) {
            if (cl.isDatacenterLocal()) {
                throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0);
            }
            replicas = snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
            return replicas.get(0);
        }
        return (Replica)localReplicas.get(ThreadLocalRandom.current().nextInt(localReplicas.size()));
    }

    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime) throws UnavailableException, OverloadedException {
        return StorageProxy.performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime);
    }

    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException {
        return StorageProxy.performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime);
    }

    private static Runnable counterWriteTask(final IMutation mutation, final ReplicaPlan.ForWrite replicaPlan, final AbstractWriteResponseHandler<IMutation> responseHandler, final String localDataCenter) {
        return new DroppableRunnable(Verb.COUNTER_MUTATION_REQ){

            @Override
            public void runMayThrow() throws OverloadedException, WriteTimeoutException {
                assert (mutation instanceof CounterMutation);
                Mutation result = ((CounterMutation)mutation).applyCounterMutation();
                responseHandler.onResponse(null);
                StorageProxy.sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
            }
        };
    }

    private static boolean systemKeyspaceQuery(List<? extends ReadCommand> cmds) {
        for (ReadCommand readCommand : cmds) {
            if (SchemaConstants.isLocalSystemKeyspace(readCommand.metadata().keyspace)) continue;
            return false;
        }
        return true;
    }

    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException {
        return PartitionIterators.getOnlyElement(StorageProxy.read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, queryStartNanoTime), command);
    }

    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException {
        if (!StorageProxy.isSafeToPerformRead(group.queries)) {
            ClientRequestsMetricsHolder.readMetrics.unavailables.mark();
            ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).unavailables.mark();
            IsBootstrappingException exception = new IsBootstrappingException();
            StorageProxy.logRequestException(exception, group.queries);
            throw exception;
        }
        if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistReadsEnabled()) {
            for (SinglePartitionReadCommand command : group.queries) {
                if (partitionDenylist.isKeyPermitted(command.metadata().id, command.partitionKey().getKey())) continue;
                denylistMetrics.incrementReadsRejected();
                throw new InvalidRequestException(String.format("Unable to read denylisted partition [0x%s] in %s/%s", command.partitionKey().toString(), command.metadata().keyspace, command.metadata().name));
            }
        }
        return consistencyLevel.isSerialConsistency() ? StorageProxy.readWithPaxos(group, consistencyLevel, queryStartNanoTime) : StorageProxy.readRegular(group, consistencyLevel, queryStartNanoTime);
    }

    public static boolean isSafeToPerformRead(List<SinglePartitionReadCommand> queries) {
        return StorageProxy.isSafeToPerformRead() || StorageProxy.systemKeyspaceQuery(queries);
    }

    public static boolean isSafeToPerformRead() {
        return !StorageService.instance.isBootstrapMode();
    }

    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException {
        return Paxos.useV2() ? Paxos.read(group, consistencyLevel) : StorageProxy.legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
    }

    private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException {
        if (group.queries.size() > 1) {
            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
        }
        long start = Clock.Global.nanoTime();
        SinglePartitionReadCommand command = (SinglePartitionReadCommand)group.queries.get(0);
        TableMetadata metadata = command.metadata();
        DecoratedKey key = command.partitionKey();
        int blockForRead = consistencyLevel.blockFor(Keyspace.open(metadata.keyspace).getReplicationStrategy());
        PartitionIterator result = null;
        try {
            ConsistencyLevel consistencyForReplayCommitsOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
            try {
                Function<Ballot, Pair<PartitionUpdate, RowIterator>> updateProposer = !Paxos.isLinearizable() ? ballot -> null : ballot -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null);
                StorageProxy.doPaxos(metadata, key, consistencyLevel, consistencyForReplayCommitsOrFetch, ConsistencyLevel.ANY, start, ClientRequestsMetricsHolder.casReadMetrics, updateProposer);
            }
            catch (WriteTimeoutException e) {
                throw new ReadTimeoutException(consistencyLevel, 0, blockForRead, false);
            }
            catch (WriteFailureException e) {
                throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, (Map<InetAddressAndPort, RequestFailureReason>)e.failureReasonByEndpoint);
            }
            result = StorageProxy.fetchRows(group.queries, consistencyForReplayCommitsOrFetch, queryStartNanoTime);
        }
        catch (UnavailableException e) {
            ClientRequestsMetricsHolder.readMetrics.unavailables.mark();
            ClientRequestsMetricsHolder.casReadMetrics.unavailables.mark();
            ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).unavailables.mark();
            StorageProxy.logRequestException(e, group.queries);
            throw e;
        }
        catch (ReadTimeoutException e) {
            ClientRequestsMetricsHolder.readMetrics.timeouts.mark();
            ClientRequestsMetricsHolder.casReadMetrics.timeouts.mark();
            ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).timeouts.mark();
            StorageProxy.logRequestException(e, group.queries);
            throw e;
        }
        catch (ReadAbortException e) {
            ClientRequestsMetricsHolder.readMetrics.markAbort(e);
            ClientRequestsMetricsHolder.casReadMetrics.markAbort(e);
            ClientRequestsMetricsHolder.readMetricsForLevel(consistencyLevel).markAbort(e);
            throw e;
        }
        catch (ReadFailureException e) {
            ClientRequestsMetricsHolder.readMetrics.failures.mark();
            ClientRequestsMetricsHolder.casReadMetrics.failures.mark();
            ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).failures.mark();
            throw e;
        }
        finally {
            long latency = Clock.Global.nanoTime() - start;
            ClientRequestsMetricsHolder.readMetrics.addNano(latency);
            ClientRequestsMetricsHolder.casReadMetrics.addNano(latency);
            ClientRequestsMetricsHolder.readMetricsForLevel(consistencyLevel).addNano(latency);
            Keyspace.open((String)metadata.keyspace).getColumnFamilyStore((String)metadata.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
        }
        return result;
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, ReadFailureException, ReadTimeoutException {
        PartitionIterator partitionIterator;
        long start = Clock.Global.nanoTime();
        try {
            PartitionIterator result = StorageProxy.fetchRows(group.queries, consistencyLevel, queryStartNanoTime);
            boolean enforceStrictLiveness = ((SinglePartitionReadCommand)group.queries.get(0)).metadata().enforceStrictLiveness();
            if (group.queries.size() > 1) {
                result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness);
            }
            partitionIterator = result;
        }
        catch (UnavailableException e) {
            try {
                ClientRequestsMetricsHolder.readMetrics.unavailables.mark();
                ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).unavailables.mark();
                StorageProxy.logRequestException(e, group.queries);
                throw e;
                catch (ReadTimeoutException e2) {
                    ClientRequestsMetricsHolder.readMetrics.timeouts.mark();
                    ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).timeouts.mark();
                    StorageProxy.logRequestException(e2, group.queries);
                    throw e2;
                }
                catch (ReadAbortException e3) {
                    StorageProxy.recordReadRegularAbort(consistencyLevel, e3);
                    throw e3;
                }
                catch (ReadFailureException e4) {
                    ClientRequestsMetricsHolder.readMetrics.failures.mark();
                    ClientRequestsMetricsHolder.readMetricsForLevel((ConsistencyLevel)consistencyLevel).failures.mark();
                    throw e4;
                }
            }
            catch (Throwable throwable) {
                long latency = Clock.Global.nanoTime() - start;
                ClientRequestsMetricsHolder.readMetrics.addNano(latency);
                ClientRequestsMetricsHolder.readMetricsForLevel(consistencyLevel).addNano(latency);
                Iterator iterator = group.queries.iterator();
                while (true) {
                    if (!iterator.hasNext()) {
                        throw throwable;
                    }
                    ReadCommand command = (ReadCommand)iterator.next();
                    Keyspace.openAndGetStore((TableMetadata)command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
                }
            }
        }
        long latency = Clock.Global.nanoTime() - start;
        ClientRequestsMetricsHolder.readMetrics.addNano(latency);
        ClientRequestsMetricsHolder.readMetricsForLevel(consistencyLevel).addNano(latency);
        Iterator iterator = group.queries.iterator();
        while (iterator.hasNext()) {
            ReadCommand command = (ReadCommand)iterator.next();
            Keyspace.openAndGetStore((TableMetadata)command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
        }
        return partitionIterator;
    }

    public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause) {
        ClientRequestsMetricsHolder.readMetrics.markAbort(cause);
        ClientRequestsMetricsHolder.readMetricsForLevel(consistencyLevel).markAbort(cause);
    }

    public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, final List<ReadRepair<?, ?>> repairs) {
        final PartitionIterator concatenated = PartitionIterators.concat(iterators);
        if (repairs.isEmpty()) {
            return concatenated;
        }
        return new PartitionIterator(){

            @Override
            public void close() {
                concatenated.close();
                repairs.forEach(ReadRepair::maybeSendAdditionalWrites);
                repairs.forEach(ReadRepair::awaitWrites);
            }

            @Override
            public boolean hasNext() {
                return concatenated.hasNext();
            }

            @Override
            public RowIterator next() {
                return (RowIterator)concatenated.next();
            }
        };
    }

    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException, ReadFailureException, ReadTimeoutException {
        int i;
        int i2;
        int cmdCount = commands.size();
        AbstractReadExecutor[] reads = new AbstractReadExecutor[cmdCount];
        for (i2 = 0; i2 < cmdCount; ++i2) {
            reads[i2] = AbstractReadExecutor.getReadExecutor(commands.get(i2), consistencyLevel, queryStartNanoTime);
            if (reads[i2].hasLocalRead()) {
                ClientRequestsMetricsHolder.readMetrics.localRequests.mark();
                continue;
            }
            ClientRequestsMetricsHolder.readMetrics.remoteRequests.mark();
        }
        for (i2 = 0; i2 < cmdCount; ++i2) {
            reads[i2].executeAsync();
        }
        for (i2 = 0; i2 < cmdCount; ++i2) {
            reads[i2].maybeTryAdditionalReplicas();
        }
        boolean logBlockingRepairAttempts = instance.isLoggingReadRepairs();
        for (i = 0; i < cmdCount; ++i) {
            reads[i].awaitResponses(logBlockingRepairAttempts);
        }
        for (i = 0; i < cmdCount; ++i) {
            reads[i].maybeSendAdditionalDataRequests();
        }
        for (i = 0; i < cmdCount; ++i) {
            reads[i].awaitReadRepair();
        }
        ArrayList<PartitionIterator> results = new ArrayList<PartitionIterator>(cmdCount);
        ArrayList repairs = new ArrayList(cmdCount);
        for (int i3 = 0; i3 < cmdCount; ++i3) {
            results.add(reads[i3].getResult());
            repairs.add(reads[i3].getReadRepair());
        }
        return StorageProxy.concatAndBlockOnRepair(results, repairs);
    }

    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) {
        int denylisted;
        if (DatabaseDescriptor.getPartitionDenylistEnabled() && DatabaseDescriptor.getDenylistRangeReadsEnabled() && (denylisted = partitionDenylist.getDeniedKeysInRangeCount(command.metadata().id, command.dataRange().keyRange())) > 0) {
            denylistMetrics.incrementRangeReadsRejected();
            String tokens = command.loggableTokens();
            throw new InvalidRequestException(String.format("Attempted to read a range containing %d denylisted keys in %s/%s. Range read: %s", denylisted, command.metadata().keyspace, command.metadata().name, tokens));
        }
        return RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime);
    }

    @Override
    public Map<String, List<String>> getSchemaVersions() {
        return StorageProxy.describeSchemaVersions(false);
    }

    @Override
    public Map<String, List<String>> getSchemaVersionsWithPort() {
        return StorageProxy.describeSchemaVersions(true);
    }

    public static Map<String, List<String>> describeSchemaVersions(boolean withPort) {
        String myVersion = Schema.instance.getVersion().toString();
        ConcurrentHashMap versions = new ConcurrentHashMap();
        Set<InetAddressAndPort> liveHosts = Gossiper.instance.getLiveMembers();
        CountDownLatch latch = CountDownLatch.newCountDownLatch(liveHosts.size());
        RequestCallback cb = message -> {
            versions.put(message.from(), (UUID)message.payload);
            latch.decrement();
        };
        Message<NoPayload> message2 = Message.out(Verb.SCHEMA_VERSION_REQ, NoPayload.noPayload);
        for (InetAddressAndPort endpoint : liveHosts) {
            MessagingService.instance().sendWithCallback(message2, endpoint, cb);
        }
        try {
            latch.await(DatabaseDescriptor.getRpcTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            throw new UncheckedInterruptedException(e);
        }
        HashMap<String, List<String>> results = new HashMap<String, List<String>>();
        Iterable allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
        for (InetAddressAndPort inetAddressAndPort : allHosts) {
            UUID version = (UUID)versions.get(inetAddressAndPort);
            String stringVersion = version == null ? UNREACHABLE : version.toString();
            ArrayList<String> hosts = (ArrayList<String>)results.get(stringVersion);
            if (hosts == null) {
                hosts = new ArrayList<String>();
                results.put(stringVersion, hosts);
            }
            hosts.add(inetAddressAndPort.getHostAddress(withPort));
        }
        if (results.get(UNREACHABLE) != null) {
            logger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", (Object)StringUtils.join((Iterable)((Iterable)results.get(UNREACHABLE)), (String)","));
        }
        for (Map.Entry entry : results.entrySet()) {
            if (((String)entry.getKey()).equals(UNREACHABLE) || ((String)entry.getKey()).equals(myVersion)) continue;
            for (String host : (List)entry.getValue()) {
                logger.debug("{} disagrees ({})", (Object)host, entry.getKey());
            }
        }
        if (results.size() == 1) {
            logger.debug("Schemas are in agreement.");
        }
        return results;
    }

    @Override
    public boolean getHintedHandoffEnabled() {
        return DatabaseDescriptor.hintedHandoffEnabled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setHintedHandoffEnabled(boolean b) {
        StorageService storageService = StorageService.instance;
        synchronized (storageService) {
            if (b) {
                StorageService.instance.checkServiceAllowedToStart("hinted handoff");
            }
            DatabaseDescriptor.setHintedHandoffEnabled(b);
        }
    }

    @Override
    public void enableHintsForDC(String dc) {
        DatabaseDescriptor.enableHintsForDC(dc);
    }

    @Override
    public void disableHintsForDC(String dc) {
        DatabaseDescriptor.disableHintsForDC(dc);
    }

    @Override
    public Set<String> getHintedHandoffDisabledDCs() {
        return DatabaseDescriptor.hintedHandoffDisabledDCs();
    }

    @Override
    public int getMaxHintWindow() {
        return DatabaseDescriptor.getMaxHintWindow();
    }

    @Override
    public void setMaxHintWindow(int ms) {
        DatabaseDescriptor.setMaxHintWindow(ms);
    }

    @Override
    public int getMaxHintsSizePerHostInMiB() {
        return DatabaseDescriptor.getMaxHintsSizePerHostInMiB();
    }

    @Override
    public void setMaxHintsSizePerHostInMiB(int value) {
        DatabaseDescriptor.setMaxHintsSizePerHostInMiB(value);
    }

    public static boolean shouldHint(Replica replica) {
        return StorageProxy.shouldHint(replica, true);
    }

    public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWindow) {
        boolean hasHintsReachedMaxSize;
        String dc;
        if (!DatabaseDescriptor.hintedHandoffEnabled() || replica.isTransient() || replica.isSelf()) {
            return false;
        }
        Set<String> disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs();
        if (!disabledDCs.isEmpty() && disabledDCs.contains(dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica))) {
            Tracing.trace("Not hinting {} since its data center {} has been disabled {}", replica, dc, disabledDCs);
            return false;
        }
        InetAddressAndPort endpoint = replica.endpoint();
        int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
        long endpointDowntime = Gossiper.instance.getEndpointDowntime(endpoint);
        boolean hintWindowExpired = endpointDowntime > (long)maxHintWindow;
        UUID hostIdForEndpoint = StorageService.instance.getHostIdForEndpoint(endpoint);
        if (hostIdForEndpoint == null) {
            Tracing.trace("Discarding hint for endpoint not part of ring: {}", (Object)endpoint);
            return false;
        }
        if (tryEnablePersistentWindow && !hintWindowExpired && DatabaseDescriptor.hintWindowPersistentEnabled()) {
            long earliestHint = HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
            boolean bl = hintWindowExpired = Clock.Global.currentTimeMillis() - (long)maxHintWindow > earliestHint;
            if (hintWindowExpired) {
                Tracing.trace("Not hinting {} for which there is the earliest hint stored at {}", (Object)replica, (Object)earliestHint);
            }
        }
        if (hintWindowExpired) {
            HintsService.instance.metrics.incrPastWindow(endpoint);
            Tracing.trace("Not hinting {} which has been down {} ms", (Object)endpoint, (Object)endpointDowntime);
            return false;
        }
        long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
        long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
        boolean bl = hasHintsReachedMaxSize = maxHintsSize > 0L && actualTotalHintsSize > maxHintsSize;
        if (hasHintsReachedMaxSize) {
            Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", endpoint, maxHintsSize, actualTotalHintsSize);
            return false;
        }
        return true;
    }

    public static void truncateBlocking(String keyspace, String cfname) throws UnavailableException, TimeoutException {
        logger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", (Object)keyspace, (Object)cfname);
        if (StorageProxy.isAnyStorageHostDown()) {
            logger.info("Cannot perform truncate, some hosts are down");
            int liveMembers = Gossiper.instance.getLiveMembers().size();
            throw UnavailableException.create(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers);
        }
        Set<InetAddressAndPort> allEndpoints = StorageService.instance.getLiveRingMembers(true);
        int blockFor = allEndpoints.size();
        TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
        Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
        Message<TruncateRequest> message = Message.out(Verb.TRUNCATE_REQ, new TruncateRequest(keyspace, cfname));
        for (InetAddressAndPort endpoint : allEndpoints) {
            MessagingService.instance().sendWithCallback(message, endpoint, (RequestCallback)responseHandler);
        }
        try {
            responseHandler.get();
        }
        catch (TimeoutException e) {
            Tracing.trace("Timed out");
            throw e;
        }
    }

    private static boolean isAnyStorageHostDown() {
        return !Gossiper.instance.getUnreachableTokenOwners().isEmpty();
    }

    public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands) {
        String msg = exception.getClass().getSimpleName() + " \"{}\" while executing {}";
        NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, (long)FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS, msg, () -> new Object[]{exception.getMessage(), commands.stream().map(rec$ -> ((ReadCommand)rec$).toCQLString()).collect(Collectors.joining("; "))});
    }

    @Override
    public long getTotalHints() {
        return StorageMetrics.totalHints.getCount();
    }

    @Override
    public int getMaxHintsInProgress() {
        return maxHintsInProgress;
    }

    @Override
    public void setMaxHintsInProgress(int qs) {
        maxHintsInProgress = qs;
    }

    @Override
    public int getHintsInProgress() {
        return (int)StorageMetrics.totalHintsInProgress.getCount();
    }

    public void verifyNoHintsInProgress() {
        if (this.getHintsInProgress() > 0) {
            logger.warn("Some hints were not written before shutdown.  This is not supposed to happen.  You should (a) run repair, and (b) file a bug report");
        }
    }

    private static AtomicInteger getHintsInProgressFor(InetAddressAndPort destination) {
        try {
            return (AtomicInteger)hintsInProgress.load((Object)destination);
        }
        catch (Exception e) {
            throw new AssertionError((Object)e);
        }
    }

    public static Future<Void> submitHint(Mutation mutation, Replica target, AbstractWriteResponseHandler<IMutation> responseHandler) {
        return StorageProxy.submitHint(mutation, EndpointsForToken.of((Token)target.range().right, target), responseHandler);
    }

    public static Future<Void> submitHint(final Mutation mutation, EndpointsForToken targets, final AbstractWriteResponseHandler<IMutation> responseHandler) {
        Replicas.assertFull(targets);
        HintRunnable runnable = new HintRunnable(targets){

            @Override
            public void runMayThrow() {
                HashSet<InetAddressAndPort> validTargets = new HashSet<InetAddressAndPort>(this.targets.size());
                HashSet<UUID> hostIds = new HashSet<UUID>(this.targets.size());
                for (InetAddressAndPort target : this.targets.endpoints()) {
                    UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
                    if (hostId != null) {
                        hostIds.add(hostId);
                        validTargets.add(target);
                        continue;
                    }
                    logger.debug("Discarding hint for endpoint not part of ring: {}", (Object)target);
                }
                logger.trace("Adding hints for {}", validTargets);
                HintsService.instance.write(hostIds, Hint.create(mutation, Clock.Global.currentTimeMillis()));
                validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
                if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY) {
                    responseHandler.onResponse(null);
                }
            }
        };
        return StorageProxy.submitHint(runnable);
    }

    private static Future<Void> submitHint(HintRunnable runnable) {
        StorageMetrics.totalHintsInProgress.inc((long)runnable.targets.size());
        for (Replica target : runnable.targets) {
            StorageProxy.getHintsInProgressFor(target.endpoint()).incrementAndGet();
        }
        return Stage.MUTATION.submit(runnable);
    }

    @Override
    public Long getRpcTimeout() {
        return DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getReadRpcTimeout() {
        return DatabaseDescriptor.getReadRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setReadRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setReadRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getWriteRpcTimeout() {
        return DatabaseDescriptor.getWriteRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setWriteRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getCounterWriteRpcTimeout() {
        return DatabaseDescriptor.getCounterWriteRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setCounterWriteRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setCounterWriteRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getCasContentionTimeout() {
        return DatabaseDescriptor.getCasContentionTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setCasContentionTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setCasContentionTimeout(timeoutInMillis);
    }

    @Override
    public Long getRangeRpcTimeout() {
        return DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setRangeRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setRangeRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getTruncateRpcTimeout() {
        return DatabaseDescriptor.getTruncateRpcTimeout(TimeUnit.MILLISECONDS);
    }

    @Override
    public void setTruncateRpcTimeout(Long timeoutInMillis) {
        DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis);
    }

    @Override
    public Long getNativeTransportMaxConcurrentConnections() {
        return DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
    }

    @Override
    public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) {
        DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections);
    }

    public Long getNativeTransportMaxConcurrentConnectionsPerIp() {
        return DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
    }

    public void setNativeTransportMaxConcurrentConnectionsPerIp(Long nativeTransportMaxConcurrentConnections) {
        DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections);
    }

    @Override
    public void reloadTriggerClasses() {
        TriggerExecutor.instance.reloadClasses();
    }

    @Override
    public long getReadRepairAttempted() {
        return ReadRepairMetrics.attempted.getCount();
    }

    @Override
    public long getReadRepairRepairedBlocking() {
        return ReadRepairMetrics.repairedBlocking.getCount();
    }

    @Override
    public long getReadRepairRepairedBackground() {
        return ReadRepairMetrics.repairedBackground.getCount();
    }

    @Override
    public long getReadRepairRepairTimedOut() {
        return ReadRepairMetrics.timedOut.getCount();
    }

    @Override
    public int getNumberOfTables() {
        return Schema.instance.getNumberOfTables();
    }

    @Override
    public String getIdealConsistencyLevel() {
        return Objects.toString((Object)DatabaseDescriptor.getIdealConsistencyLevel(), "");
    }

    @Override
    public String setIdealConsistencyLevel(String cl) {
        ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel();
        ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase());
        DatabaseDescriptor.setIdealConsistencyLevel(newCL);
        return String.format("Updating ideal consistency level new value: %s old value %s", new Object[]{newCL, original.toString()});
    }

    @Override
    @Deprecated(since="4.0")
    public int getOtcBacklogExpirationInterval() {
        return 0;
    }

    @Override
    @Deprecated(since="4.0")
    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
    }

    @Override
    public void enableRepairedDataTrackingForRangeReads() {
        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
    }

    @Override
    public void disableRepairedDataTrackingForRangeReads() {
        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
    }

    @Override
    public boolean getRepairedDataTrackingEnabledForRangeReads() {
        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
    }

    @Override
    public void enableRepairedDataTrackingForPartitionReads() {
        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
    }

    @Override
    public void disableRepairedDataTrackingForPartitionReads() {
        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
    }

    @Override
    public boolean getRepairedDataTrackingEnabledForPartitionReads() {
        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
    }

    @Override
    public void enableReportingUnconfirmedRepairedDataMismatches() {
        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
    }

    @Override
    public void disableReportingUnconfirmedRepairedDataMismatches() {
        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
    }

    @Override
    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled() {
        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
    }

    @Override
    public boolean getSnapshotOnRepairedDataMismatchEnabled() {
        return DatabaseDescriptor.snapshotOnRepairedDataMismatch();
    }

    @Override
    public void enableSnapshotOnRepairedDataMismatch() {
        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true);
    }

    @Override
    public void disableSnapshotOnRepairedDataMismatch() {
        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false);
    }

    @Override
    public boolean getSnapshotOnDuplicateRowDetectionEnabled() {
        return DatabaseDescriptor.snapshotOnDuplicateRowDetection();
    }

    @Override
    public void enableSnapshotOnDuplicateRowDetection() {
        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
    }

    @Override
    public void disableSnapshotOnDuplicateRowDetection() {
        DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(false);
    }

    @Override
    public boolean getCheckForDuplicateRowsDuringReads() {
        return DatabaseDescriptor.checkForDuplicateRowsDuringReads();
    }

    @Override
    public void enableCheckForDuplicateRowsDuringReads() {
        DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(true);
    }

    @Override
    public void disableCheckForDuplicateRowsDuringReads() {
        DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(false);
    }

    @Override
    public boolean getCheckForDuplicateRowsDuringCompaction() {
        return DatabaseDescriptor.checkForDuplicateRowsDuringCompaction();
    }

    @Override
    public void enableCheckForDuplicateRowsDuringCompaction() {
        DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(true);
    }

    @Override
    public void disableCheckForDuplicateRowsDuringCompaction() {
        DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
    }

    public void initialLoadPartitionDenylist() {
        partitionDenylist.initialLoad();
    }

    @Override
    public void loadPartitionDenylist() {
        partitionDenylist.load();
    }

    @Override
    public int getPartitionDenylistLoadAttempts() {
        return partitionDenylist.getLoadAttempts();
    }

    @Override
    public int getPartitionDenylistLoadSuccesses() {
        return partitionDenylist.getLoadSuccesses();
    }

    @Override
    public void setEnablePartitionDenylist(boolean enabled) {
        DatabaseDescriptor.setPartitionDenylistEnabled(enabled);
    }

    @Override
    public void setEnableDenylistWrites(boolean enabled) {
        DatabaseDescriptor.setDenylistWritesEnabled(enabled);
    }

    @Override
    public void setEnableDenylistReads(boolean enabled) {
        DatabaseDescriptor.setDenylistReadsEnabled(enabled);
    }

    @Override
    public void setEnableDenylistRangeReads(boolean enabled) {
        DatabaseDescriptor.setDenylistRangeReadsEnabled(enabled);
    }

    @Override
    public void setDenylistMaxKeysPerTable(int value) {
        DatabaseDescriptor.setDenylistMaxKeysPerTable(value);
    }

    @Override
    public void setDenylistMaxKeysTotal(int value) {
        DatabaseDescriptor.setDenylistMaxKeysTotal(value);
    }

    @Override
    public boolean denylistKey(String keyspace, String table, String partitionKeyAsString) {
        if (!Schema.instance.getKeyspaces().contains((Object)keyspace)) {
            return false;
        }
        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table);
        if (cfs == null) {
            return false;
        }
        ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
        return partitionDenylist.addKeyToDenylist(keyspace, table, bytes);
    }

    @Override
    public boolean removeDenylistKey(String keyspace, String table, String partitionKeyAsString) {
        if (!Schema.instance.getKeyspaces().contains((Object)keyspace)) {
            return false;
        }
        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table);
        if (cfs == null) {
            return false;
        }
        ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
        return partitionDenylist.removeKeyFromDenylist(keyspace, table, bytes);
    }

    @Override
    public boolean isKeyDenylisted(String keyspace, String table, String partitionKeyAsString) {
        if (!Schema.instance.getKeyspaces().contains((Object)keyspace)) {
            return false;
        }
        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table);
        if (cfs == null) {
            return false;
        }
        ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
        return !partitionDenylist.isKeyPermitted(keyspace, table, bytes);
    }

    @Override
    public void logBlockingReadRepairAttemptsForNSeconds(int seconds) {
        this.logBlockingReadRepairAttemptsUntilNanos = Clock.Global.nanoTime() + TimeUnit.SECONDS.toNanos(seconds);
    }

    @Override
    public boolean isLoggingReadRepairs() {
        return Clock.Global.nanoTime() <= StorageProxy.instance.logBlockingReadRepairAttemptsUntilNanos;
    }

    @Override
    public void setPaxosVariant(String variant) {
        Preconditions.checkNotNull((Object)variant);
        Paxos.setPaxosVariant(Config.PaxosVariant.valueOf(variant));
    }

    @Override
    public String getPaxosVariant() {
        return Paxos.getPaxosVariant().toString();
    }

    @Override
    public boolean getUseStatementsEnabled() {
        return DatabaseDescriptor.getUseStatementsEnabled();
    }

    @Override
    public void setUseStatementsEnabled(boolean enabled) {
        DatabaseDescriptor.setUseStatementsEnabled(enabled);
    }

    @Override
    public void setPaxosContentionStrategy(String spec) {
        ContentionStrategy.setStrategy(spec);
    }

    @Override
    public String getPaxosContentionStrategy() {
        return ContentionStrategy.getStrategySpec();
    }

    @Override
    public void setPaxosCoordinatorLockingDisabled(boolean disabled) {
        PaxosState.setDisableCoordinatorLocking(disabled);
    }

    @Override
    public boolean getPaxosCoordinatorLockingDisabled() {
        return PaxosState.getDisableCoordinatorLocking();
    }

    @Override
    public boolean getDumpHeapOnUncaughtException() {
        return DatabaseDescriptor.getDumpHeapOnUncaughtException();
    }

    @Override
    public void setDumpHeapOnUncaughtException(boolean enabled) {
        DatabaseDescriptor.setDumpHeapOnUncaughtException(enabled);
    }

    @Override
    public boolean getSStableReadRatePersistenceEnabled() {
        return DatabaseDescriptor.getSStableReadRatePersistenceEnabled();
    }

    @Override
    public void setSStableReadRatePersistenceEnabled(boolean enabled) {
        DatabaseDescriptor.setSStableReadRatePersistenceEnabled(enabled);
    }

    @Override
    public boolean getClientRequestSizeMetricsEnabled() {
        return DatabaseDescriptor.getClientRequestSizeMetricsEnabled();
    }

    @Override
    public void setClientRequestSizeMetricsEnabled(boolean enabled) {
        DatabaseDescriptor.setClientRequestSizeMetricsEnabled(enabled);
    }

    static {
        instance = new StorageProxy();
        maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors();
        hintsInProgress = new CacheLoader<InetAddressAndPort, AtomicInteger>(){

            public AtomicInteger load(InetAddressAndPort inetAddress) {
                return new AtomicInteger(0);
            }
        };
        denylistMetrics = new DenylistMetrics();
        partitionDenylist = new PartitionDenylist();
        MBeanWrapper.instance.registerMBean((Object)instance, MBEAN_NAME);
        HintsService.instance.registerMBean();
        standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> {
            assert (mutation instanceof Mutation);
            StorageProxy.sendToHintedReplicas((Mutation)mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
        };
        counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) -> {
            EndpointsForToken selected = (EndpointsForToken)((EndpointsForToken)targets.contacts()).withoutSelf();
            Replicas.temporaryAssertFull(selected);
            StorageProxy.counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter).run();
        };
        counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) -> {
            EndpointsForToken selected = (EndpointsForToken)((EndpointsForToken)targets.contacts()).withoutSelf();
            Replicas.temporaryAssertFull(selected);
            Stage.COUNTER_MUTATION.executor().execute(StorageProxy.counterWriteTask(mutation, targets.withContacts(selected), responseHandler, localDataCenter));
        };
        ReadRepairMetrics.init();
        if (!Paxos.isLinearizable()) {
            logger.warn("This node was started with paxos variant {}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node will not offer linearizability (see CASSANDRA-12126 for details on what this means) with respect to other SERIAL operations. Please note that with this variant, SERIAL reads will be slower than QUORUM reads, yet offer no additional guarantees. This flag should only be used in the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you understand the tradeoff.", (Object)Paxos.getPaxosVariant());
        }
    }

    static class PaxosBallotAndContention {
        final Ballot ballot;
        final int contentions;

        PaxosBallotAndContention(Ballot ballot, int contentions) {
            this.ballot = ballot;
            this.contentions = contentions;
        }

        public final int hashCode() {
            int hashCode = 31 + (this.ballot == null ? 0 : this.ballot.hashCode());
            return 31 * hashCode * this.contentions;
        }

        public final boolean equals(Object o) {
            if (!(o instanceof PaxosBallotAndContention)) {
                return false;
            }
            PaxosBallotAndContention that = (PaxosBallotAndContention)o;
            return Objects.equals(this.ballot, that.ballot) && this.contentions == that.contentions;
        }
    }

    private static abstract class HintRunnable
    implements Runnable {
        public final EndpointsForToken targets;

        protected HintRunnable(EndpointsForToken targets) {
            this.targets = targets;
        }

        @Override
        public void run() {
            try {
                this.runMayThrow();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                StorageMetrics.totalHintsInProgress.dec((long)this.targets.size());
                for (InetAddressAndPort target : this.targets.endpoints()) {
                    StorageProxy.getHintsInProgressFor(target).decrementAndGet();
                }
            }
        }

        protected abstract void runMayThrow() throws Exception;
    }

    private static abstract class LocalMutationRunnable
    implements DebuggableTask.RunnableDebuggableTask {
        private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
        private volatile long approxStartTimeNanos;
        private final Replica localReplica;

        LocalMutationRunnable(Replica localReplica) {
            this.localReplica = localReplica;
        }

        @Override
        public final void run() {
            Verb verb = this.verb();
            this.approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
            long expirationTimeNanos = verb.expiresAtNanos(this.approxCreationTimeNanos);
            if (this.approxStartTimeNanos > expirationTimeNanos) {
                long timeTakenNanos = this.approxStartTimeNanos - this.approxCreationTimeNanos;
                MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.MUTATION_REQ, timeTakenNanos, TimeUnit.NANOSECONDS);
                HintRunnable runnable = new HintRunnable(EndpointsForToken.of((Token)this.localReplica.range().right, this.localReplica)){

                    @Override
                    protected void runMayThrow() throws Exception {
                        this.runMayThrow();
                    }
                };
                StorageProxy.submitHint(runnable);
                return;
            }
            try {
                this.runMayThrow();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public long creationTimeNanos() {
            return this.approxCreationTimeNanos;
        }

        @Override
        public long startTimeNanos() {
            return this.approxStartTimeNanos;
        }

        @Override
        public abstract String description();

        protected abstract Verb verb();

        protected abstract void runMayThrow() throws Exception;
    }

    private static abstract class DroppableRunnable
    implements Runnable {
        protected final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
        protected volatile long approxStartTimeNanos;
        final Verb verb;

        public DroppableRunnable(Verb verb) {
            this.verb = verb;
        }

        @Override
        public final void run() {
            this.approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
            long expirationTimeNanos = this.verb.expiresAtNanos(this.approxCreationTimeNanos);
            if (this.approxStartTimeNanos > expirationTimeNanos) {
                long timeTakenNanos = this.approxStartTimeNanos - this.approxCreationTimeNanos;
                MessagingService.instance().metrics.recordSelfDroppedMessage(this.verb, timeTakenNanos, TimeUnit.NANOSECONDS);
                return;
            }
            try {
                this.runMayThrow();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        protected abstract void runMayThrow() throws Exception;
    }

    private static class ViewWriteMetricsWrapped
    extends BatchlogResponseHandler<IMutation> {
        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogResponseHandler.BatchlogCleanup cleanup, long queryStartNanoTime) {
            super(writeHandler, i, cleanup, queryStartNanoTime);
            ClientRequestsMetricsHolder.viewWriteMetrics.viewReplicasAttempted.inc((long)this.candidateReplicaCount());
        }

        @Override
        public void onResponse(Message<IMutation> msg) {
            super.onResponse(msg);
            ClientRequestsMetricsHolder.viewWriteMetrics.viewReplicasSuccess.inc();
        }
    }

    public static interface WritePerformer {
        public void apply(IMutation var1, ReplicaPlan.ForWrite var2, AbstractWriteResponseHandler<IMutation> var3, String var4) throws OverloadedException;
    }

    public static class LocalReadRunnable
    extends DroppableRunnable
    implements DebuggableTask.RunnableDebuggableTask {
        private final ReadCommand command;
        private final ReadCallback handler;
        private final boolean trackRepairedStatus;

        public LocalReadRunnable(ReadCommand command, ReadCallback handler) {
            this(command, handler, false);
        }

        public LocalReadRunnable(ReadCommand command, ReadCallback handler, boolean trackRepairedStatus) {
            super(Verb.READ_REQ);
            this.command = command;
            this.handler = handler;
            this.trackRepairedStatus = trackRepairedStatus;
        }

        @Override
        protected void runMayThrow() {
            try {
                ReadResponse response;
                boolean readRejected;
                block22: {
                    MessageParams.reset();
                    readRejected = false;
                    this.command.setMonitoringTime(this.approxCreationTimeNanos, false, this.verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(TimeUnit.NANOSECONDS));
                    try (ReadExecutionController controller = this.command.executionController(this.trackRepairedStatus);
                         UnfilteredPartitionIterator iterator = this.command.executeLocally(controller);){
                        response = this.command.createResponse(iterator, controller.getRepairedDataInfo());
                    }
                    catch (RejectException e) {
                        if (!this.command.isTrackingWarnings()) {
                            throw e;
                        }
                        response = this.command.createEmptyResponse();
                        readRejected = true;
                    }
                    catch (QueryCancelledException e) {
                        logger.debug("Query cancelled (timeout)", (Throwable)e);
                        response = null;
                        if ($assertionsDisabled || !this.command.isCompleted()) break block22;
                        throw new AssertionError((Object)("Local read marked as completed despite being aborted by timeout to table " + this.command.metadata()));
                    }
                }
                if (this.command.complete()) {
                    this.handler.response(response);
                } else {
                    MessagingService.instance().metrics.recordSelfDroppedMessage(this.verb, MonotonicClock.Global.approxTime.now() - this.approxCreationTimeNanos, TimeUnit.NANOSECONDS);
                    this.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN);
                }
                if (!readRejected) {
                    MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.Global.approxTime.now() - this.approxCreationTimeNanos, TimeUnit.NANOSECONDS);
                }
            }
            catch (Throwable t) {
                if (t instanceof TombstoneOverwhelmingException) {
                    this.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
                    logger.error(t.getMessage());
                }
                this.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN);
                throw t;
            }
        }

        @Override
        public long creationTimeNanos() {
            return this.approxCreationTimeNanos;
        }

        @Override
        public long startTimeNanos() {
            return this.approxStartTimeNanos;
        }

        @Override
        public String description() {
            return this.command.toCQLString();
        }
    }

    private static class WriteResponseHandlerWrapper {
        final BatchlogResponseHandler<IMutation> handler;
        final Mutation mutation;

        WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation) {
            this.handler = handler;
            this.mutation = mutation;
        }
    }
}

