/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.functional.FunctionalCommand;
import org.infinispan.commands.functional.Mutation;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.TxReadOnlyKeyCommand;
import org.infinispan.commands.functional.TxReadOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.TransactionBoundaryCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.versioning.EntryVersionsMap;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.functional.EntryView;
import org.infinispan.functional.impl.EntryViews;
import org.infinispan.interceptors.distribution.BaseDistributionInterceptor;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.AllOwnersLostException;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class TxDistributionInterceptor
extends BaseDistributionInterceptor {
    private static final Log log = LogFactory.getLog(TxDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final long SKIP_REMOTE_FLAGS = FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP;
    private PartitionHandlingManager partitionHandlingManager;
    private boolean forceRemoteReadForFunctionalCommands;
    private final TxReadOnlyManyHelper txReadOnlyManyHelper = new TxReadOnlyManyHelper();
    private final ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper();
    private final ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper();

    @Inject
    public void inject(PartitionHandlingManager partitionHandlingManager) {
        this.partitionHandlingManager = partitionHandlingManager;
    }

    @Override
    public void configure() {
        super.configure();
        this.forceRemoteReadForFunctionalCommands = this.cacheConfiguration.sites().hasEnabledBackups();
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    private void updateMatcherForRetry(WriteCommand command) {
        command.setValueMatcher(command.isSuccessful() ? ValueMatcher.MATCH_ALWAYS : ValueMatcher.MATCH_NEVER);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        if (command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) {
            return this.handleNonTxWriteCommand(ctx, command);
        }
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.handleTxWriteManyEntriesCommand(ctx, command, command.getMap(), (c, entries) -> new PutMapCommand((PutMapCommand)c).withMap((Map<Object, Object>)entries));
    }

    @Override
    public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            TxInvocationContext localTxCtx = ctx;
            Collection<Address> affectedNodes = this.checkTopologyId(command).getWriteOwners(command.getKeys());
            Collection<Address> recipients = this.isReplicated ? null : affectedNodes;
            ((LocalTransaction)localTxCtx.getCacheTransaction()).locksAcquired(affectedNodes);
            log.tracef("Registered remote locks acquired %s", (Object)affectedNodes);
            RpcOptions rpcOptions = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, rpcOptions);
            return TxDistributionInterceptor.asyncValue(remoteInvocation.thenApply(responses -> {
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, localTxCtx, ((LocalTransaction)localTxCtx.getCacheTransaction()).getRemoteLocksAcquired());
                return null;
            }));
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        return this.handleTxFunctionalCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        return this.handleTxFunctionalCommand(ctx, command);
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        return this.handleTxFunctionalCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        return this.handleTxWriteManyEntriesCommand(ctx, command, command.getEntries(), (c, entries) -> new WriteOnlyManyEntriesCommand(c).withEntries(entries));
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        return this.handleTxFunctionalCommand(ctx, command);
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        return this.handleTxWriteManyCommand(ctx, command, command.getAffectedKeys(), (c, keys) -> new WriteOnlyManyCommand(c).withKeys(keys));
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            if (this.forceRemoteReadForFunctionalCommands && !command.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP)) {
                CompletableFuture<Void> cf = this.remoteGetAll(ctx, command, command.getAffectedKeys(), RemoteGetAllForWriteHandler.INSTANCE);
                return this.asyncInvokeNext(ctx, (VisitableCommand)command, cf);
            }
            return this.handleFunctionalReadManyCommand(ctx, command, this.readWriteManyHelper);
        }
        return this.handleTxWriteManyCommand(ctx, command, command.getAffectedKeys(), this.readWriteManyHelper::copyForLocal);
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            if (this.forceRemoteReadForFunctionalCommands && !command.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP)) {
                CompletableFuture<Void> cf = this.remoteGetAll(ctx, command, command.getAffectedKeys(), RemoteGetAllForWriteHandler.INSTANCE);
                return this.asyncInvokeNext(ctx, (VisitableCommand)command, cf);
            }
            return this.handleFunctionalReadManyCommand(ctx, command, this.readWriteManyEntriesHelper);
        }
        return this.handleTxWriteManyEntriesCommand(ctx, command, command.getEntries(), (c, entries) -> new ReadWriteManyEntriesCommand((ReadWriteManyEntriesCommand)c).withEntries(entries));
    }

    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
        return this.handleSecondPhaseCommand(ctx, command);
    }

    @Override
    public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
        if (!ctx.isOriginLocal()) {
            return this.invokeNext(ctx, command);
        }
        return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
            if (!this.shouldInvokeRemoteTxCommand(ctx)) {
                return null;
            }
            TxInvocationContext localTxCtx = (TxInvocationContext)rCtx;
            LocalTransaction localTx = (LocalTransaction)localTxCtx.getCacheTransaction();
            LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
            Collection<Address> writeOwners = cacheTopology.getWriteOwners(localTxCtx.getAffectedKeys());
            localTx.locksAcquired(writeOwners);
            Collection<Address> recipients = this.isReplicated ? null : localTx.getCommitNodes(writeOwners, cacheTopology);
            CompletableFuture<Object> remotePrepare = this.prepareOnAffectedNodes(localTxCtx, (PrepareCommand)rCommand, recipients);
            return TxDistributionInterceptor.asyncValue(remotePrepare);
        });
    }

    protected CompletableFuture<Object> prepareOnAffectedNodes(TxInvocationContext<?> ctx, PrepareCommand command, Collection<Address> recipients) {
        try {
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, this.createRpcOptions());
            return remoteInvocation.handle((responses, t) -> {
                TxDistributionInterceptor.transactionRemotelyPrepared(ctx);
                CompletableFutures.rethrowException(t);
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, (LocalTxInvocationContext)ctx, recipients);
                return null;
            });
        }
        catch (Throwable t2) {
            TxDistributionInterceptor.transactionRemotelyPrepared(ctx);
            throw t2;
        }
    }

    @Override
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
        return this.handleSecondPhaseCommand(ctx, command);
    }

    private Object handleSecondPhaseCommand(TxInvocationContext ctx, TransactionBoundaryCommand command) {
        if (this.shouldInvokeRemoteTxCommand(ctx)) {
            Collection<Address> recipients = this.getCommitNodes(ctx, command);
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, this.createRpcOptions());
            return TxDistributionInterceptor.asyncValue(remoteInvocation.thenApply(responses -> {
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, ctx, recipients);
                return null;
            }));
        }
        return this.invokeNext(ctx, command);
    }

    private Collection<Address> getCommitNodes(TxInvocationContext ctx, TopologyAffectedCommand command) {
        LocalTransaction localTx = (LocalTransaction)ctx.getCacheTransaction();
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        Collection<Address> affectedNodes = this.isReplicated ? null : cacheTopology.getWriteOwners(ctx.getAffectedKeys());
        return localTx.getCommitNodes(affectedNodes, cacheTopology);
    }

    protected void checkTxCommandResponses(Map<Address, Response> responseMap, TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        for (Map.Entry<Address, Response> e : responseMap.entrySet()) {
            Address recipient = e.getKey();
            Response response = e.getValue();
            if (response == CacheNotFoundResponse.INSTANCE) {
                if (!cacheTopology.getMembers().contains(recipient)) {
                    if (!trace) continue;
                    log.tracef("Ignoring response from node not targeted %s", (Object)recipient);
                    continue;
                }
                if (this.checkCacheNotFoundResponseInPartitionHandling(command, context, recipients)) {
                    if (trace) {
                        log.tracef("Cache not running on node %s, or the node is missing. It will be handled by the PartitionHandlingManager", (Object)recipient);
                    }
                    return;
                }
                if (trace) {
                    log.tracef("Cache not running on node %s, or the node is missing", (Object)recipient);
                }
                throw OutdatedTopologyException.INSTANCE;
            }
            if (response != UnsureResponse.INSTANCE) continue;
            if (trace) {
                log.tracef("Node %s has a newer topology id", (Object)recipient);
            }
            throw OutdatedTopologyException.INSTANCE;
        }
    }

    private boolean checkCacheNotFoundResponseInPartitionHandling(TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        GlobalTransaction globalTransaction = command.getGlobalTransaction();
        Set<Object> lockedKeys = context.getLockedKeys();
        if (command instanceof RollbackCommand) {
            return this.partitionHandlingManager.addPartialRollbackTransaction(globalTransaction, recipients, lockedKeys);
        }
        if (command instanceof PrepareCommand) {
            if (((PrepareCommand)command).isOnePhaseCommit()) {
                return this.partitionHandlingManager.addPartialCommit1PCTransaction(globalTransaction, recipients, lockedKeys, Arrays.asList(((PrepareCommand)command).getModifications()));
            }
        } else if (command instanceof CommitCommand) {
            EntryVersionsMap newVersion = null;
            if (command instanceof VersionedCommitCommand) {
                newVersion = ((VersionedCommitCommand)command).getUpdatedVersions();
            }
            return this.partitionHandlingManager.addPartialCommit2PCTransaction(globalTransaction, recipients, lockedKeys, newVersion);
        }
        return false;
    }

    private Object handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command, Object key) throws Throwable {
        try {
            if (!ctx.isOriginLocal() && !this.checkTopologyId(command).isWriteOwner(command.getKey())) {
                return null;
            }
            CacheEntry entry = ctx.lookupEntry(command.getKey());
            if (entry == null) {
                if (this.isLocalModeForced(command) || command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP) || !this.needsPreviousValue(ctx, command)) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
                } else {
                    Object result = this.asyncInvokeNext(ctx, (VisitableCommand)command, this.remoteGet(ctx, command, command.getKey(), true));
                    return TxDistributionInterceptor.makeStage(result).andFinally(ctx, command, (rCtx, rCommand, rv, t) -> this.updateMatcherForRetry((WriteCommand)rCommand));
                }
            }
            return this.invokeNextAndFinally(ctx, command, (rCtx, rCommand, rv, t) -> this.updateMatcherForRetry((WriteCommand)rCommand));
        }
        catch (Throwable t2) {
            this.updateMatcherForRetry(command);
            throw t2;
        }
    }

    protected <C extends TopologyAffectedCommand & FlagAffectedCommand, K, V> Object handleTxWriteManyEntriesCommand(InvocationContext ctx, C command, Map<K, V> entries, BiFunction<C, Map<K, V>, C> copyCommand) {
        boolean ignorePreviousValue = ((FlagAffectedCommand)command).hasAnyFlag(SKIP_REMOTE_FLAGS) || ((VisitableCommand)command).loadType() == VisitableCommand.LoadType.DONT_LOAD;
        HashMap<K, V> filtered = new HashMap<K, V>(entries.size());
        ArrayList<CompletableFuture<Void>> remoteGets = null;
        LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
        for (Map.Entry<K, V> e : entries.entrySet()) {
            K key = e.getKey();
            if (!ctx.isOriginLocal() && !cacheTopology.isWriteOwner(key)) continue;
            if (ctx.lookupEntry(key) == null) {
                if (ignorePreviousValue) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
                } else {
                    if (remoteGets == null) {
                        remoteGets = new ArrayList<CompletableFuture<Void>>();
                    }
                    remoteGets.add(this.remoteGet(ctx, command, key, true));
                }
            }
            filtered.put(key, e.getValue());
        }
        return this.asyncInvokeNext(ctx, (VisitableCommand)copyCommand.apply(command, filtered), remoteGets);
    }

    protected <C extends VisitableCommand & FlagAffectedCommand, K> Object handleTxWriteManyCommand(InvocationContext ctx, C command, Collection<K> keys, BiFunction<C, List<K>, C> copyCommand) {
        boolean ignorePreviousValue = ((FlagAffectedCommand)command).hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD;
        ArrayList<K> filtered = new ArrayList<K>(keys.size());
        ArrayList<CompletableFuture<Void>> remoteGets = null;
        LocalizedCacheTopology cacheTopology = this.checkTopologyId((TopologyAffectedCommand)command);
        for (K key : keys) {
            if (!ctx.isOriginLocal() && !cacheTopology.isWriteOwner(key)) continue;
            if (ctx.lookupEntry(key) == null) {
                if (ignorePreviousValue) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
                } else {
                    if (remoteGets == null) {
                        remoteGets = new ArrayList<CompletableFuture<Void>>();
                    }
                    remoteGets.add(this.remoteGet(ctx, command, key, true));
                }
            }
            filtered.add(key);
        }
        return this.asyncInvokeNext(ctx, (VisitableCommand)copyCommand.apply(command, filtered), remoteGets);
    }

    public <C extends AbstractDataWriteCommand> Object handleTxFunctionalCommand(InvocationContext ctx, C command) {
        Object key = command.getKey();
        if (ctx.isOriginLocal()) {
            CacheEntry entry = ctx.lookupEntry(key);
            if (entry == null) {
                if (command.hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
                    return this.invokeNext(ctx, command);
                }
                if (this.forceRemoteReadForFunctionalCommands && !command.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP)) {
                    return this.asyncInvokeNext(ctx, command, this.remoteGet(ctx, command, key, true));
                }
                LocalizedCacheTopology cacheTopology = this.checkTopologyId(command);
                List<Address> owners = cacheTopology.getDistribution(key).readOwners();
                List mutationsOnKey = TxDistributionInterceptor.getMutationsOnKey((TxInvocationContext)ctx, key);
                mutationsOnKey.add(((FunctionalCommand)((Object)command)).toMutation(key));
                TxReadOnlyKeyCommand remoteRead = new TxReadOnlyKeyCommand(key, mutationsOnKey);
                return TxDistributionInterceptor.asyncValue(this.rpcManager.invokeRemotelyAsync(owners, remoteRead, this.getStaggeredOptions(owners.size())).thenApply(responses -> {
                    for (Response r : responses.values()) {
                        if (!(r instanceof SuccessfulResponse)) continue;
                        SuccessfulResponse response = (SuccessfulResponse)r;
                        Object responseValue = response.getResponseValue();
                        return this.unwrapFunctionalResultOnOrigin(ctx, command.getKey(), responseValue);
                    }
                    throw TxDistributionInterceptor.handleMissingSuccessfulResponse(responses);
                }));
            }
            return this.invokeNext(ctx, command);
        }
        if (!this.checkTopologyId(command).isWriteOwner(key)) {
            return null;
        }
        CacheEntry entry = ctx.lookupEntry(key);
        if (entry == null) {
            if (command.hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD) {
                this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
            } else {
                return this.asyncInvokeNext(ctx, command, this.remoteGet(ctx, command, command.getKey(), true));
            }
        }
        return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> this.wrapFunctionalResultOnNonOriginOnReturn(rv, entry));
    }

    private boolean needsPreviousValue(InvocationContext ctx, FlagAffectedCommand command) {
        switch (command.loadType()) {
            case DONT_LOAD: {
                return false;
            }
            case PRIMARY: {
                return ctx.isOriginLocal();
            }
            case OWNER: {
                return true;
            }
        }
        throw new IllegalStateException();
    }

    @Override
    public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        return this.handleFunctionalReadManyCommand(ctx, command, this.txReadOnlyManyHelper);
    }

    @Override
    protected ReadOnlyKeyCommand remoteReadOnlyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) {
        if (!ctx.isInTxScope()) {
            return command;
        }
        return new TxReadOnlyKeyCommand(command, TxDistributionInterceptor.getMutationsOnKey((TxInvocationContext)ctx, command.getKey()));
    }

    @Override
    protected <C extends FlagAffectedCommand & TopologyAffectedCommand> CompletableFuture<Void> remoteGet(InvocationContext ctx, C command, Object key, boolean isWrite) {
        CompletableFuture<Void> cf = super.remoteGet(ctx, command, key, isWrite);
        if (!ctx.isOriginLocal() || !ctx.isInTxScope()) {
            return cf;
        }
        List<Mutation> mutationsOnKey = TxDistributionInterceptor.getMutationsOnKey((TxInvocationContext)ctx, key);
        if (mutationsOnKey.isEmpty()) {
            return cf;
        }
        return cf.thenRun(() -> {
            this.entryFactory.wrapEntryForWriting(ctx, key, false, true);
            MVCCEntry cacheEntry = (MVCCEntry)ctx.lookupEntry(key);
            EntryView.ReadWriteEntryView readWriteEntryView = EntryViews.readWrite(cacheEntry);
            for (Mutation mutation : mutationsOnKey) {
                mutation.apply(readWriteEntryView);
                cacheEntry.updatePreviousValue();
            }
        });
    }

    @Override
    protected void handleRemotelyRetrievedKeys(InvocationContext ctx, List<?> remoteKeys) {
        if (!ctx.isInTxScope()) {
            return;
        }
        List<List<Mutation>> mutations = TxDistributionInterceptor.getMutations(ctx, remoteKeys);
        if (mutations == null || mutations.isEmpty()) {
            return;
        }
        Iterator<?> keysIterator = remoteKeys.iterator();
        Iterator<List<Mutation>> mutationsIterator = mutations.iterator();
        while (keysIterator.hasNext() && mutationsIterator.hasNext()) {
            Object key = keysIterator.next();
            this.entryFactory.wrapEntryForWriting(ctx, key, false, true);
            MVCCEntry cacheEntry = (MVCCEntry)ctx.lookupEntry(key);
            EntryView.ReadWriteEntryView readWriteEntryView = EntryViews.readWrite(cacheEntry);
            for (Mutation mutation : mutationsIterator.next()) {
                mutation.apply(readWriteEntryView);
                cacheEntry.updatePreviousValue();
            }
        }
        assert (!keysIterator.hasNext());
        assert (!mutationsIterator.hasNext());
    }

    protected RpcOptions createRpcOptions() {
        return this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
    }

    private static List<Mutation> getMutationsOnKey(TxInvocationContext ctx, Object key) {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        for (WriteCommand write : ((AbstractCacheTransaction)ctx.getCacheTransaction()).getModifications()) {
            if (!write.getAffectedKeys().contains(key)) continue;
            if (write instanceof FunctionalCommand) {
                mutations.add(((FunctionalCommand)((Object)write)).toMutation(key));
                continue;
            }
            throw new IllegalStateException("Attempt to remote functional read after non-functional modification! key=" + key + ", modification=" + write);
        }
        return mutations;
    }

    private static List<List<Mutation>> getMutations(InvocationContext ctx, List<?> keys) {
        if (!ctx.isInTxScope()) {
            return null;
        }
        log.tracef("Looking up mutations for %s", (Object)keys);
        TxInvocationContext txCtx = (TxInvocationContext)ctx;
        ArrayList<List<Mutation>> mutations = new ArrayList<List<Mutation>>(keys.size());
        for (int i = keys.size(); i > 0; --i) {
            mutations.add(Collections.emptyList());
        }
        for (WriteCommand write : ((AbstractCacheTransaction)txCtx.getCacheTransaction()).getModifications()) {
            for (int i = 0; i < keys.size(); ++i) {
                Object key = keys.get(i);
                if (!write.getAffectedKeys().contains(key)) continue;
                if (write instanceof FunctionalCommand) {
                    ArrayList list = (ArrayList)mutations.get(i);
                    if (list.isEmpty()) {
                        list = new ArrayList();
                        mutations.set(i, list);
                    }
                    list.add(((FunctionalCommand)((Object)write)).toMutation(key));
                    continue;
                }
                throw new IllegalStateException("Attempt to remote functional read after non-functional modification! key=" + key + ", modification=" + write);
            }
        }
        return mutations;
    }

    private class ReadWriteManyEntriesHelper
    extends BaseFunctionalWriteHelper<ReadWriteManyEntriesCommand> {
        private ReadWriteManyEntriesHelper() {
        }

        @Override
        public ReadWriteManyEntriesCommand copyForLocal(ReadWriteManyEntriesCommand command, List<Object> keys) {
            return new ReadWriteManyEntriesCommand(command).withEntries(this.filterEntries(command.getEntries(), keys));
        }

        private <K, V> Map<K, V> filterEntries(Map<K, V> originalEntries, List<K> keys) {
            HashMap<K, V> entries = new HashMap<K, V>(keys.size());
            for (K key : keys) {
                entries.put(key, originalEntries.get(key));
            }
            return entries;
        }
    }

    private class ReadWriteManyHelper
    extends BaseFunctionalWriteHelper<ReadWriteManyCommand> {
        private ReadWriteManyHelper() {
        }

        @Override
        public ReadWriteManyCommand copyForLocal(ReadWriteManyCommand command, List<Object> keys) {
            return new ReadWriteManyCommand(command).withKeys(keys);
        }
    }

    private abstract class BaseFunctionalWriteHelper<C extends FunctionalCommand & WriteCommand>
    implements BaseDistributionInterceptor.ReadManyCommandHelper<C> {
        private BaseFunctionalWriteHelper() {
        }

        @Override
        public Collection<?> keys(C command) {
            return ((WriteCommand)command).getAffectedKeys();
        }

        @Override
        public ReplicableCommand copyForRemote(C command, List<Object> keys, InvocationContext ctx) {
            List mutations = TxDistributionInterceptor.getMutations(ctx, keys);
            assert (mutations != null);
            for (int i = 0; i < keys.size(); ++i) {
                List list = (List)mutations.get(i);
                Mutation mutation = command.toMutation((Object)keys.get(i));
                if (list.isEmpty()) {
                    mutations.set(i, Collections.singletonList(mutation));
                    continue;
                }
                list.add(mutation);
            }
            return new TxReadOnlyManyCommand(keys, mutations);
        }

        @Override
        public void applyLocalResult(BaseDistributionInterceptor.MergingCompletableFuture allFuture, Object rv) {
            int pos = 0;
            for (Object value : (List)rv) {
                allFuture.results[pos++] = value;
            }
        }

        @Override
        public Object transformResult(Object[] results) {
            return Arrays.asList(results);
        }

        @Override
        public Object apply(InvocationContext rCtx, VisitableCommand rCommand, Object rv) throws Throwable {
            return TxDistributionInterceptor.this.wrapFunctionalManyResultOnNonOrigin(rCtx, ((WriteCommand)rCommand).getAffectedKeys(), ((List)rv).toArray());
        }
    }

    private class TxReadOnlyManyHelper
    extends BaseDistributionInterceptor.ReadOnlyManyHelper {
        private TxReadOnlyManyHelper() {
        }

        @Override
        public ReplicableCommand copyForRemote(ReadOnlyManyCommand command, List<Object> keys, InvocationContext ctx) {
            List mutations = TxDistributionInterceptor.getMutations(ctx, keys);
            if (mutations == null) {
                return new ReadOnlyManyCommand(command).withKeys(keys);
            }
            return new TxReadOnlyManyCommand(command, mutations).withKeys(keys);
        }
    }

    private static class RemoteGetAllForWriteHandler
    implements BaseDistributionInterceptor.RemoteGetAllHandler {
        private static RemoteGetAllForWriteHandler INSTANCE = new RemoteGetAllForWriteHandler();

        private RemoteGetAllForWriteHandler() {
        }

        @Override
        public void onUnsureResponse() {
            throw OutdatedTopologyException.INSTANCE;
        }

        @Override
        public void onKeysLost(Collection<?> lostKeys) {
            throw AllOwnersLostException.INSTANCE;
        }
    }
}

