/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateConsumerImpl;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.concurrent.WithinThreadExecutor;

public class ScatteredStateConsumerImpl
extends StateConsumerImpl {
    protected static final long SKIP_OWNERSHIP_FLAGS = FlagBitSets.SKIP_OWNERSHIP_CHECK;
    protected InternalEntryFactory entryFactory;
    protected ExecutorService asyncExecutor;
    protected ScatteredVersionManager svm;
    @GuardedBy(value="transferMapsLock")
    protected Set<Integer> inboundSegments;
    protected AtomicLong chunkCounter = new AtomicLong();
    protected final ConcurrentMap<Address, BlockingQueue<Object>> retrievedEntries = new ConcurrentHashMap<Address, BlockingQueue<Object>>();
    protected BlockingQueue<InternalCacheEntry> backupQueue;
    protected final ConcurrentMap<Address, BlockingQueue<KeyAndVersion>> invalidations = new ConcurrentHashMap<Address, BlockingQueue<KeyAndVersion>>();
    protected Collection<Address> backupAddress;
    protected Collection<Address> nonBackupAddresses;
    private int chunkSize;

    @Inject
    public void inject(InternalEntryFactory entryFactory, @ComponentName(value="org.infinispan.executors.transport") ExecutorService executorService, ScatteredVersionManager svm) {
        this.entryFactory = entryFactory;
        this.asyncExecutor = executorService;
        this.svm = svm;
    }

    @Override
    public void start() {
        super.start();
        this.chunkSize = this.configuration.clustering().stateTransfer().chunkSize();
        this.backupQueue = new ArrayBlockingQueue<InternalCacheEntry>(this.chunkSize);
    }

    @Override
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        Address nextMember = this.getNextMember(cacheTopology);
        this.backupAddress = nextMember == null ? Collections.emptySet() : Collections.singleton(nextMember);
        this.nonBackupAddresses = new ArrayList<Address>(cacheTopology.getActualMembers());
        this.nonBackupAddresses.remove(nextMember);
        this.nonBackupAddresses.remove(this.rpcManager.getAddress());
        return super.onTopologyUpdate(cacheTopology, isRebalance);
    }

    @Override
    protected void beforeTopologyInstalled(int topologyId, boolean startRebalance, ConsistentHash previousWriteCh, ConsistentHash newWriteCh) {
        for (int segment = 0; segment < newWriteCh.getNumSegments(); ++segment) {
            if (newWriteCh.isSegmentLocalToNode(this.rpcManager.getAddress(), segment)) continue;
            this.cancelTransfers(Collections.singleton(segment));
            this.svm.unregisterSegment(segment);
        }
        Set<Integer> addedSegments = this.getOwnedSegments(newWriteCh);
        if (previousWriteCh != null) {
            addedSegments.removeAll(this.getOwnedSegments(previousWriteCh));
        }
        this.svm.setTopologyId(topologyId);
        if (previousWriteCh == null || !this.isFetchEnabled) {
            log.trace("This is the first topology or state transfer is disabled, not expecting any state transfer.");
            this.svm.setOwnedSegments(addedSegments);
            return;
        }
        if (!addedSegments.isEmpty()) {
            this.svm.setValuesTransferTopology(topologyId);
        }
        for (int segment : addedSegments) {
            this.svm.registerSegment(segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void handleSegments(boolean startRebalance, Set<Integer> addedSegments, Set<Integer> removedSegments) {
        if (!startRebalance) {
            log.trace("This is not a rebalance, not doing anything...");
            return;
        }
        if (addedSegments.isEmpty()) {
            log.trace("No segments missing");
            return;
        }
        Object object = this.transferMapsLock;
        synchronized (object) {
            this.inboundSegments = new HashSet<Integer>(addedSegments);
        }
        this.chunkCounter.set(0L);
        ((CompletableFuture)this.rpcManager.invokeRemotelyAsync(null, this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.CONFIRM_REVOKED_SEGMENTS, this.rpcManager.getAddress(), this.cacheTopology.getTopologyId(), null), this.synchronousIgnoreLeaversRpcOptions).whenComplete((responses, throwable) -> {
            if (throwable == null) {
                try {
                    this.svm.startKeyTransfer(addedSegments);
                    this.requestKeyTransfer(addedSegments);
                }
                catch (Throwable t) {
                    log.failedToRequestSegments(this.cacheName, null, addedSegments, t);
                }
            } else {
                if (this.cache.getAdvancedCache().getComponentRegistry().getStatus() == ComponentStatus.RUNNING) {
                    log.failedConfirmingRevokedSegments((Throwable)throwable);
                } else {
                    log.debug("Failed confirming revoked segments", (Throwable)throwable);
                }
                Iterator iterator = addedSegments.iterator();
                while (iterator.hasNext()) {
                    int segment = (Integer)iterator.next();
                    this.svm.notifyKeyTransferFinished(segment, false, false);
                }
                this.notifyEndOfStateTransferIfNeeded();
            }
        })).join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestKeyTransfer(Set<Integer> segments) {
        boolean isTransferringKeys = false;
        Iterator<Integer> iterator = this.transferMapsLock;
        synchronized (iterator) {
            ArrayList<Address> members = new ArrayList<Address>(this.cacheTopology.getActualMembers());
            Collections.shuffle(members);
            for (Address source : members) {
                if (this.rpcManager.getAddress().equals(source)) continue;
                isTransferringKeys = true;
                InboundTransferTask inboundTransfer = new InboundTransferTask(segments, source, this.cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, this.configuration.clustering().stateTransfer().timeout(), this.cacheName, true);
                this.addTransfer(inboundTransfer, segments);
                this.stateRequestExecutor.executeAsync(() -> {
                    log.tracef("Requesting keys for segments %s from %s", (Object)inboundTransfer.getSegments(), (Object)inboundTransfer.getSource());
                    return inboundTransfer.requestKeys().whenComplete((nil, e) -> this.onTaskCompletion(inboundTransfer));
                });
            }
        }
        if (!isTransferringKeys) {
            log.trace("No keys in transfer, finishing segments " + segments);
            for (int segment : segments) {
                this.svm.notifyKeyTransferFinished(segment, false, false);
            }
            this.notifyEndOfStateTransferIfNeeded();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onTaskCompletion(InboundTransferTask inboundTransfer) {
        Iterator readCh;
        Set<Object> completedSegments = Collections.emptySet();
        if (trace) {
            log.tracef("Inbound transfer finished %s: %s", (Object)inboundTransfer, (Object)(inboundTransfer.isCompletedSuccessfully() ? "successfully" : "unsuccessfuly"));
        }
        Object object = this.transferMapsLock;
        synchronized (object) {
            PrimitiveIterator.OfInt ofInt = inboundTransfer.getSegments().iterator();
            block12: while (ofInt.hasNext()) {
                int segment = (Integer)ofInt.next();
                List transfers = (List)this.transfersBySegment.get(segment);
                if (transfers == null) {
                    log.tracef("Transfers for segment %d have not been found.", segment);
                    continue;
                }
                transfers.remove(inboundTransfer);
                if (!transfers.isEmpty()) continue;
                this.transfersBySegment.remove(segment);
                if (trace) {
                    log.tracef("All transfer tasks for segment %d have completed.", segment);
                }
                this.svm.notifyKeyTransferFinished(segment, inboundTransfer.isCompletedSuccessfully(), inboundTransfer.isCancelled());
                switch (completedSegments.size()) {
                    case 0: {
                        completedSegments = Collections.singleton(segment);
                        continue block12;
                    }
                    case 1: {
                        completedSegments = new HashSet(completedSegments);
                    }
                }
                completedSegments.add(segment);
            }
        }
        if (completedSegments.isEmpty()) {
            log.tracef("Not requesting any values yet because no segments have been completed.", new Object[0]);
        } else if (inboundTransfer.isCompletedSuccessfully()) {
            Set finalCompletedSegments = completedSegments;
            log.tracef("Requesting values from segments %s, for in-memory keys", (Object)finalCompletedSegments);
            readCh = this.cacheTopology.getReadConsistentHash();
            for (InternalCacheEntry ice : this.dataContainer) {
                Object key = ice.getKey();
                int segmentId = readCh.getSegment(key);
                if (!finalCompletedSegments.contains(segmentId)) continue;
                if (ice.getMetadata() instanceof RemoteMetadata) {
                    Address backup = ((RemoteMetadata)ice.getMetadata()).getAddress();
                    this.retrieveEntry(ice.getKey(), backup);
                    for (Address member : this.cacheTopology.getActualMembers()) {
                        if (member.equals(backup)) continue;
                        this.invalidate(ice.getKey(), ice.getMetadata().version(), member);
                    }
                    continue;
                }
                this.backupEntry(ice);
                for (Address member : this.nonBackupAddresses) {
                    this.invalidate(ice.getKey(), ice.getMetadata().version(), member);
                }
            }
            AdvancedCacheLoader stProvider = this.persistenceManager.getStateTransferProvider();
            if (stProvider != null) {
                try {
                    CollectionKeyFilter<Object> filter = new CollectionKeyFilter<Object>(new ReadOnlyDataContainerBackedKeySet(this.dataContainer));
                    AdvancedCacheLoader.CacheLoaderTask task = (arg_0, arg_1) -> this.lambda$onTaskCompletion$3((ConsistentHash)((Object)readCh), finalCompletedSegments, arg_0, arg_1);
                    stProvider.process(filter, task, new WithinThreadExecutor(), true, true);
                }
                catch (CacheException e) {
                    log.failedLoadingKeysFromCacheStore(e);
                }
            }
        }
        boolean lastTransfer = false;
        readCh = this.transferMapsLock;
        synchronized (readCh) {
            this.inboundSegments.removeAll(completedSegments);
            log.tracef("Unfinished inbound segments: " + this.inboundSegments, new Object[0]);
            if (this.inboundSegments.isEmpty()) {
                lastTransfer = true;
            }
        }
        if (lastTransfer) {
            for (Map.Entry pair : this.retrievedEntries.entrySet()) {
                BlockingQueue queue = (BlockingQueue)pair.getValue();
                ArrayList<Object> keys = new ArrayList<Object>(queue.size());
                queue.drainTo(keys);
                if (keys.isEmpty()) continue;
                this.getValuesAndApply((Address)pair.getKey(), keys);
            }
            ArrayList<InternalCacheEntry> entries = new ArrayList<InternalCacheEntry>(this.backupQueue.size());
            this.backupQueue.drainTo(entries);
            if (!entries.isEmpty()) {
                this.backupEntries(entries);
            }
            for (Map.Entry pair : this.invalidations.entrySet()) {
                BlockingQueue queue = (BlockingQueue)pair.getValue();
                ArrayList<KeyAndVersion> list = new ArrayList<KeyAndVersion>(queue.size());
                queue.drainTo(list);
                if (list.isEmpty()) continue;
                this.invalidate(list, (Address)pair.getKey());
            }
        }
        this.removeTransfer(inboundTransfer);
        if (this.chunkCounter.get() == 0L) {
            this.notifyEndOfStateTransferIfNeeded();
        }
    }

    private <T> List<T> offerAndDrain(BlockingQueue<T> queue, T element) {
        ArrayList<T> list = null;
        if (queue.offer(element)) {
            if (queue.size() >= this.chunkSize) {
                list = new ArrayList(this.chunkSize);
                queue.drainTo(list, this.chunkSize);
            }
        } else {
            list = new ArrayList<T>(this.chunkSize);
            list.add(element);
            queue.drainTo(list, this.chunkSize - 1);
        }
        return list;
    }

    private void invalidate(Object key, EntryVersion version, Address member) {
        BlockingQueue queue = this.invalidations.computeIfAbsent(member, m -> new ArrayBlockingQueue(this.chunkSize));
        List<KeyAndVersion> list = this.offerAndDrain(queue, new KeyAndVersion(key, version));
        if (list != null && !list.isEmpty()) {
            this.invalidate(list, member);
        }
    }

    private void invalidate(List<KeyAndVersion> list, Address member) {
        Object[] keys = new Object[list.size()];
        int[] topologyIds = new int[list.size()];
        long[] versions = new long[list.size()];
        int i = 0;
        for (KeyAndVersion pair : list) {
            keys[i] = pair.key;
            SimpleClusteredVersion version = (SimpleClusteredVersion)pair.version;
            topologyIds[i] = version.topologyId;
            versions[i] = version.version;
            ++i;
        }
        this.chunkCounter.incrementAndGet();
        InvalidateVersionsCommand ivc = this.commandsFactory.buildInvalidateVersionsCommand(this.cacheTopology.getTopologyId(), keys, topologyIds, versions, true);
        this.rpcManager.invokeRemotelyAsync(Collections.singleton(member), ivc, this.synchronousRpcOptions).whenComplete((responses, t) -> {
            if (t != null) {
                log.failedInvalidatingRemoteCache((Throwable)t);
            }
            if (this.chunkCounter.decrementAndGet() == 0L) {
                this.notifyEndOfStateTransferIfNeeded();
            }
        });
    }

    private void backupEntry(InternalCacheEntry entry) {
        List<InternalCacheEntry> entries = this.offerAndDrain(this.backupQueue, entry);
        if (entries != null && !entries.isEmpty()) {
            this.backupEntries(entries);
        }
    }

    private void backupEntries(List<InternalCacheEntry> entries) {
        this.chunkCounter.incrementAndGet();
        HashMap map = new HashMap();
        for (InternalCacheEntry entry : entries) {
            map.put(entry.getKey(), entry.toInternalCacheValue());
        }
        PutMapCommand putMapCommand = this.commandsFactory.buildPutMapCommand(map, null, STATE_TRANSFER_FLAGS);
        this.rpcManager.invokeRemotelyAsync(this.backupAddress, putMapCommand, this.synchronousRpcOptions).whenComplete((responseMap, throwable) -> {
            try {
                if (throwable != null) {
                    log.failedOutBoundTransferExecution((Throwable)throwable);
                }
            }
            finally {
                if (this.chunkCounter.decrementAndGet() == 0L) {
                    this.notifyEndOfStateTransferIfNeeded();
                }
            }
        });
    }

    private void retrieveEntry(Object key, Address address) {
        BlockingQueue queue = this.retrievedEntries.computeIfAbsent(address, k -> new ArrayBlockingQueue(this.chunkSize));
        List<Object> keys = this.offerAndDrain(queue, key);
        if (keys != null && !keys.isEmpty()) {
            this.getValuesAndApply(address, keys);
        }
    }

    private void getValuesAndApply(Address address, List<Object> keys) {
        this.chunkCounter.incrementAndGet();
        ClusteredGetAllCommand command = this.commandsFactory.buildClusteredGetAllCommand(keys, SKIP_OWNERSHIP_FLAGS, null);
        this.rpcManager.invokeRemotelyAsync(Collections.singleton(address), command, this.rpcManager.getDefaultRpcOptions(true)).whenComplete((responseMap, throwable) -> {
            try {
                if (throwable != null) {
                    throw log.exceptionProcessingEntryRetrievalValues((Throwable)throwable);
                }
                this.applyValues(address, keys, (Map<Address, Response>)responseMap);
            }
            catch (Throwable t) {
                log.failedProcessingValuesDuringRebalance(t);
                throw t;
            }
            finally {
                if (this.chunkCounter.decrementAndGet() == 0L) {
                    this.notifyEndOfStateTransferIfNeeded();
                }
            }
        });
    }

    private void applyValues(Address address, List<Object> keys, Map<Address, Response> responseMap) {
        Response response = responseMap.get(address);
        if (response == null) {
            throw new CacheException("Did not get response from " + address + ", got " + responseMap);
        }
        if (!response.isSuccessful()) {
            throw new CacheException("Response from " + address + " is unsuccessful: " + response);
        }
        InternalCacheValue[] values = (InternalCacheValue[])((SuccessfulResponse)response).getResponseValue();
        if (values == null) {
            throw new IllegalStateException();
        }
        for (int i = 0; i < keys.size(); ++i) {
            Object key = keys.get(i);
            InternalCacheValue icv = values[i];
            PutKeyValueCommand put = this.commandsFactory.buildPutKeyValueCommand(key, icv.getValue(), icv.getMetadata(), STATE_TRANSFER_FLAGS);
            try {
                this.interceptorChain.invoke(this.icf.createSingleKeyNonTxInvocationContext(), put);
                continue;
            }
            catch (Exception e) {
                if (!this.cache.getStatus().allowInvocations()) {
                    log.debugf("Cache %s is shutting down, stopping state transfer", (Object)this.cacheName);
                    break;
                }
                log.problemApplyingStateForKey(e.getMessage(), key, e);
            }
        }
    }

    @Override
    public void stopApplyingState(int topologyId) {
        this.svm.notifyValueTransferFinished();
        super.stopApplyingState(topologyId);
    }

    @Override
    protected void removeStaleData(Set<Integer> removedSegments) throws InterruptedException {
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        Address myAddress = this.rpcManager.getAddress();
        List<Address> members = cacheTopology.getActualMembers();
        if (members.size() <= 1) {
            return null;
        }
        Iterator<Address> it = members.iterator();
        while (it.hasNext()) {
            Address member = it.next();
            if (!member.equals(myAddress)) continue;
            if (it.hasNext()) {
                return it.next();
            }
            return members.get(0);
        }
        return null;
    }

    private /* synthetic */ void lambda$onTaskCompletion$3(ConsistentHash readCh, Set finalCompletedSegments, MarshalledEntry me, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
        int segmentId = readCh.getSegment(me.getKey());
        if (finalCompletedSegments.contains(segmentId)) {
            try {
                InternalMetadata metadata = me.getMetadata();
                if (metadata instanceof RemoteMetadata) {
                    Address backup = ((RemoteMetadata)metadata).getAddress();
                    this.retrieveEntry(me.getKey(), backup);
                    for (Address member : this.cacheTopology.getActualMembers()) {
                        if (member.equals(backup)) continue;
                        this.invalidate(me.getKey(), metadata.version(), member);
                    }
                } else {
                    this.backupEntry(this.entryFactory.create(me.getKey(), me.getValue(), me.getMetadata()));
                    for (Address member : this.nonBackupAddresses) {
                        this.invalidate(me.getKey(), metadata.version(), member);
                    }
                }
            }
            catch (CacheException e) {
                log.failedLoadingValueFromCacheStore(me.getKey(), e);
            }
        }
    }

    protected static class KeyAndVersion {
        public final Object key;
        public final EntryVersion version;

        public KeyAndVersion(Object key, EntryVersion version) {
            this.key = key;
            this.version = version;
        }
    }
}

