/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.gossip.map;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.protocol.map.MapDelegate;
import io.atomix.primitive.protocol.map.MapDelegateEvent;
import io.atomix.primitive.protocol.map.MapDelegateEventListener;
import io.atomix.protocols.gossip.AntiEntropyProtocolConfig;
import io.atomix.protocols.gossip.PeerSelector;
import io.atomix.protocols.gossip.TimestampProvider;
import io.atomix.protocols.gossip.map.AntiEntropyAdvertisement;
import io.atomix.protocols.gossip.map.AntiEntropyResponse;
import io.atomix.protocols.gossip.map.MapValue;
import io.atomix.protocols.gossip.map.UpdateEntry;
import io.atomix.protocols.gossip.map.UpdateRequest;
import io.atomix.utils.concurrent.AbstractAccumulator;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.Event;
import io.atomix.utils.misc.SlidingWindowCounter;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.Timestamp;
import io.atomix.utils.time.WallClockTimestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AntiEntropyMapDelegate<K, V>
implements MapDelegate<K, V> {
    private static final Logger log = LoggerFactory.getLogger(AntiEntropyMapDelegate.class);
    private static final String ERROR_DESTROYED = " map is already destroyed";
    private static final String ERROR_NULL_KEY = "Key cannot be null";
    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private final Map<String, MapValue> items;
    private final ClusterCommunicationService clusterCommunicator;
    private final ClusterMembershipService membershipService;
    private final Serializer entrySerializer;
    private final Serializer serializer;
    private final TimestampProvider<Map.Entry<K, V>> timestampProvider;
    private final String bootstrapMessageSubject;
    private final String initializeMessageSubject;
    private final String updateMessageSubject;
    private final String antiEntropyAdvertisementSubject;
    private final String updateRequestSubject;
    private final Set<MapDelegateEventListener<K, V>> listeners = Sets.newCopyOnWriteArraySet();
    private final ExecutorService executor;
    private final ScheduledExecutorService backgroundExecutor;
    private final PeerSelector<Map.Entry<K, V>> peerUpdateFunction;
    private final ExecutorService communicationExecutor;
    private final Map<MemberId, EventAccumulator> senderPending;
    private final Map<MemberId, Long> antiEntropyTimes = Maps.newConcurrentMap();
    private final String mapName;
    private final String destroyedMessage;
    private final long initialDelaySec = 5L;
    private final boolean tombstonesDisabled;
    private final Supplier<List<MemberId>> peersSupplier;
    private final Supplier<List<MemberId>> bootstrapPeersSupplier;
    private final MemberId localMemberId;
    private long previousTombstonePurgeTime;
    private volatile boolean closed = false;
    private SlidingWindowCounter counter = new SlidingWindowCounter(5);
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("atomix-anti-entropy-map-sender-events");

    public AntiEntropyMapDelegate(String name, Serializer entrySerializer, AntiEntropyProtocolConfig config, PrimitiveManagementService managementService) {
        this.localMemberId = managementService.getMembershipService().getLocalMember().id();
        this.mapName = name;
        this.entrySerializer = entrySerializer;
        this.serializer = Serializer.using((Namespace)Namespace.builder().nextId(600).register(Namespaces.BASIC).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{AntiEntropyAdvertisement.class}).register(new Class[]{AntiEntropyResponse.class}).register(new Class[]{UpdateEntry.class}).register(new Class[]{MapValue.class}).register(new Class[]{MapValue.Digest.class}).register(new Class[]{UpdateRequest.class}).register(new Class[]{MemberId.class}).build(name + "-anti-entropy-map"));
        this.items = Maps.newConcurrentMap();
        this.senderPending = Maps.newConcurrentMap();
        this.destroyedMessage = this.mapName + ERROR_DESTROYED;
        this.clusterCommunicator = managementService.getCommunicationService();
        this.membershipService = managementService.getMembershipService();
        this.timestampProvider = config.getTimestampProvider();
        List peers = config.getPeers() != null ? config.getPeers().stream().map(MemberId::from).collect(Collectors.toList()) : null;
        this.peersSupplier = () -> peers != null ? peers : managementService.getMembershipService().getMembers().stream().map(Member::id).sorted().collect(Collectors.toList());
        this.bootstrapPeersSupplier = this.peersSupplier;
        PeerSelector peerSelector = config.getPeerSelector();
        this.peerUpdateFunction = (entry, m) -> {
            Collection<MemberId> selected = peerSelector.select(entry, m);
            return this.peersSupplier.get().stream().filter(selected::contains).collect(Collectors.toList());
        };
        this.executor = Executors.newFixedThreadPool(8, Threads.namedThreads((String)("atomix-anti-entropy-map-" + this.mapName + "-fg-%d"), (Logger)log));
        this.communicationExecutor = Executors.newFixedThreadPool(8, Threads.namedThreads((String)("atomix-anti-entropy-map-" + this.mapName + "-publish-%d"), (Logger)log));
        this.backgroundExecutor = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)("atomix-anti-entropy-map-" + this.mapName + "-bg-%d"), (Logger)log));
        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement, 5L, config.getAntiEntropyInterval().toMillis(), TimeUnit.MILLISECONDS);
        this.bootstrapMessageSubject = "atomix-gossip-map-" + this.mapName + "-bootstrap";
        this.clusterCommunicator.subscribe(this.bootstrapMessageSubject, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::handleBootstrap, arg_0 -> ((Serializer)this.serializer).encode(arg_0));
        this.initializeMessageSubject = "atomix-gossip-map-" + this.mapName + "-initialize";
        this.clusterCommunicator.subscribe(this.initializeMessageSubject, arg_0 -> ((Serializer)this.serializer).decode(arg_0), u -> {
            this.processUpdates((Collection<UpdateEntry>)u);
            return null;
        }, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)this.executor);
        this.updateMessageSubject = "atomix-gossip-map-" + this.mapName + "-update";
        this.clusterCommunicator.subscribe(this.updateMessageSubject, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::processUpdates, (Executor)this.executor);
        this.antiEntropyAdvertisementSubject = "atomix-gossip-map-" + this.mapName + "-anti-entropy";
        this.clusterCommunicator.subscribe(this.antiEntropyAdvertisementSubject, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::handleAntiEntropyAdvertisement, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)this.backgroundExecutor);
        this.updateRequestSubject = "atomix-gossip-map-" + this.mapName + "-update-request";
        this.clusterCommunicator.subscribe(this.updateRequestSubject, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::handleUpdateRequests, (Executor)this.backgroundExecutor);
        if (!config.isTombstonesDisabled()) {
            this.previousTombstonePurgeTime = 0L;
            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, 5L, config.getAntiEntropyInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
        this.tombstonesDisabled = config.isTombstonesDisabled();
        this.bootstrap();
    }

    private String encodeKey(Object key) {
        return BaseEncoding.base16().encode(this.entrySerializer.encode(key));
    }

    private byte[] encodeValue(Object value) {
        return value != null ? this.entrySerializer.encode(value) : null;
    }

    private K decodeKey(String key) {
        return (K)this.entrySerializer.decode(BaseEncoding.base16().decode((CharSequence)key));
    }

    private V decodeValue(byte[] value) {
        return (V)(value != null ? this.entrySerializer.decode(value) : null);
    }

    public int size() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).size();
    }

    public boolean isEmpty() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return this.size() == 0;
    }

    public boolean containsKey(Object key) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)key, (Object)ERROR_NULL_KEY);
        return this.get(key) != null;
    }

    public boolean containsValue(Object value) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)value, (Object)ERROR_NULL_VALUE);
        return this.items.values().stream().filter(MapValue::isAlive).anyMatch(v -> Arrays.equals(this.encodeValue(value), v.get()));
    }

    public V get(Object key) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)key, (Object)ERROR_NULL_KEY);
        MapValue value = this.items.get(this.encodeKey(key));
        return value == null || value.isTombstone() ? null : (V)value.get(this::decodeValue);
    }

    public V put(K key, V value) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        String encodedKey = this.encodeKey(key);
        byte[] encodedValue = this.encodeValue(value);
        MapValue newValue = new MapValue(encodedValue, this.timestampProvider.get(Maps.immutableEntry(key, value)));
        this.counter.incrementCount();
        AtomicReference oldValue = new AtomicReference();
        AtomicBoolean updated = new AtomicBoolean(false);
        this.items.compute(encodedKey, (k, existing) -> {
            if (existing == null || newValue.isNewerThan((MapValue)existing)) {
                updated.set(true);
                oldValue.set(existing != null ? existing.get() : null);
                return newValue;
            }
            return existing;
        });
        if (updated.get()) {
            this.notifyPeers(new UpdateEntry(encodedKey, newValue), this.peerUpdateFunction.select(Maps.immutableEntry(key, value), this.membershipService));
            if (oldValue.get() == null) {
                this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.INSERT, key, value));
            } else {
                this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.UPDATE, key, value));
            }
            return this.decodeValue((byte[])oldValue.get());
        }
        return value;
    }

    public V remove(Object key) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)key, (Object)ERROR_NULL_KEY);
        return this.removeAndNotify(key, null);
    }

    public boolean remove(Object key, Object value) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull((Object)value, (Object)ERROR_NULL_VALUE);
        return this.removeAndNotify(key, value) != null;
    }

    private V removeAndNotify(K key, V value) {
        String encodedKey = this.encodeKey(key);
        byte[] encodedValue = this.encodeValue(value);
        Timestamp timestamp = this.timestampProvider.get(Maps.immutableEntry(key, value));
        Optional<MapValue> tombstone = this.tombstonesDisabled || timestamp == null ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
        MapValue previousValue = this.removeInternal(encodedKey, Optional.ofNullable(encodedValue), tombstone);
        V decodedPreviousValue = null;
        if (previousValue != null) {
            decodedPreviousValue = previousValue.get(this::decodeValue);
            this.notifyPeers(new UpdateEntry(encodedKey, tombstone.orElse(null)), this.peerUpdateFunction.select(Maps.immutableEntry(key, decodedPreviousValue), this.membershipService));
            if (previousValue.isAlive()) {
                this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.REMOVE, key, decodedPreviousValue));
            }
        }
        return decodedPreviousValue;
    }

    private MapValue removeInternal(String key, Optional<byte[]> value, Optional<MapValue> tombstone) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull((Object)key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(value, (Object)ERROR_NULL_VALUE);
        tombstone.ifPresent(v -> Preconditions.checkState((boolean)v.isTombstone()));
        this.counter.incrementCount();
        AtomicBoolean updated = new AtomicBoolean(false);
        AtomicReference previousValue = new AtomicReference();
        this.items.compute(key, (k, existing) -> {
            boolean valueMatches = true;
            if (value.isPresent() && existing != null && existing.isAlive()) {
                valueMatches = Arrays.equals((byte[])value.get(), existing.get());
            }
            if (existing == null) {
                log.trace("ECMap Remove: Existing value for key {} is already null", k);
            }
            if (valueMatches) {
                if (existing == null) {
                    updated.set(tombstone.isPresent());
                } else {
                    updated.set(!tombstone.isPresent() || ((MapValue)tombstone.get()).isNewerThan((MapValue)existing));
                }
            }
            if (updated.get()) {
                previousValue.set(existing);
                return tombstone.orElse(null);
            }
            return existing;
        });
        return (MapValue)previousValue.get();
    }

    public V compute(K key, BiFunction<? super K, ? super V, ? extends V> recomputeFunction) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Preconditions.checkNotNull(key, (Object)ERROR_NULL_KEY);
        Preconditions.checkNotNull(recomputeFunction, (Object)"Recompute function cannot be null");
        String encodedKey = this.encodeKey(key);
        AtomicReference update = new AtomicReference();
        AtomicReference previousValue = new AtomicReference();
        MapValue computedValue = this.items.compute(encodedKey, (k, mv) -> {
            previousValue.set(mv);
            Object newRawValue = recomputeFunction.apply((K)key, (V)(mv == null ? null : (Object)mv.get(this::decodeValue)));
            byte[] newEncodedValue = this.encodeValue(newRawValue);
            if (mv != null && Arrays.equals(newEncodedValue, mv.get())) {
                return mv;
            }
            MapValue newValue = new MapValue(newEncodedValue, this.timestampProvider.get(Maps.immutableEntry((Object)key, newRawValue)));
            if (mv == null) {
                update.set(MapDelegateEvent.Type.INSERT);
                return newValue;
            }
            if (newValue.isNewerThan((MapValue)mv)) {
                update.set(MapDelegateEvent.Type.UPDATE);
                return newValue;
            }
            return mv;
        });
        if (update.get() != null) {
            V value;
            MapDelegateEvent.Type updateType;
            this.notifyPeers(new UpdateEntry(encodedKey, computedValue), this.peerUpdateFunction.select(Maps.immutableEntry(key, computedValue.get(this::decodeValue)), this.membershipService));
            MapDelegateEvent.Type type = updateType = computedValue.isTombstone() ? MapDelegateEvent.Type.REMOVE : (MapDelegateEvent.Type)update.get();
            V v = computedValue.isTombstone() ? (previousValue.get() == null ? null : (V)((MapValue)previousValue.get()).get(this::decodeValue)) : (value = (V)computedValue.get(this::decodeValue));
            if (value != null) {
                this.notifyListeners(new MapDelegateEvent(updateType, key, value));
            }
            return value;
        }
        return computedValue.get(this::decodeValue);
    }

    public void putAll(Map<? extends K, ? extends V> m) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        m.forEach(this::put);
    }

    public void clear() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        Maps.filterValues(this.items, MapValue::isAlive).forEach((k, v) -> this.remove(this.decodeKey((String)k)));
    }

    public Set<K> keySet() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).keySet().stream().map(this::decodeKey).collect(Collectors.toSet());
    }

    public Collection<V> values() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Collections2.transform(Maps.filterValues(this.items, MapValue::isAlive).values(), value -> value.get(this::decodeValue));
    }

    public Set<Map.Entry<K, V>> entrySet() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        return Maps.filterValues(this.items, MapValue::isAlive).entrySet().stream().map(e -> Pair.of(this.decodeKey((String)e.getKey()), this.decodeValue(((MapValue)e.getValue()).get()))).collect(Collectors.toSet());
    }

    public void addListener(MapDelegateEventListener<K, V> listener) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.add((MapDelegateEventListener)Preconditions.checkNotNull(listener));
        this.items.forEach((k, v) -> {
            if (v.isAlive()) {
                listener.event((Event)new MapDelegateEvent(MapDelegateEvent.Type.INSERT, this.decodeKey((String)k), v.get(this::decodeValue)));
            }
        });
    }

    public void removeListener(MapDelegateEventListener<K, V> listener) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)this.destroyedMessage);
        this.listeners.remove(Preconditions.checkNotNull(listener));
    }

    public void close() {
        this.closed = true;
        this.executor.shutdown();
        this.backgroundExecutor.shutdown();
        this.communicationExecutor.shutdown();
        this.listeners.clear();
        this.clusterCommunicator.unsubscribe(this.bootstrapMessageSubject);
        this.clusterCommunicator.unsubscribe(this.initializeMessageSubject);
        this.clusterCommunicator.unsubscribe(this.updateMessageSubject);
        this.clusterCommunicator.unsubscribe(this.updateRequestSubject);
        this.clusterCommunicator.unsubscribe(this.antiEntropyAdvertisementSubject);
    }

    private void notifyListeners(MapDelegateEvent<K, V> event) {
        this.listeners.forEach(listener -> listener.event((Event)event));
    }

    private void notifyPeers(UpdateEntry event, Collection<MemberId> peers) {
        this.queueUpdate(event, peers);
    }

    private void queueUpdate(UpdateEntry event, Collection<MemberId> peers) {
        if (peers == null) {
            return;
        }
        peers.forEach(node -> this.senderPending.computeIfAbsent((MemberId)node, unusedKey -> new EventAccumulator((MemberId)node)).add(event));
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2L;
    }

    private void sendAdvertisement() {
        try {
            if (this.underHighLoad() || this.closed) {
                return;
            }
            this.pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        }
        catch (Exception e) {
            log.error("Exception thrown while sending advertisement", (Throwable)e);
        }
    }

    private Optional<MemberId> pickRandomActivePeer() {
        List<MemberId> activePeers = this.peersSupplier.get();
        Collections.shuffle(activePeers);
        return activePeers.stream().findFirst();
    }

    private void sendAdvertisementToPeer(MemberId peer) {
        long adCreationTime = System.currentTimeMillis();
        AntiEntropyAdvertisement ad = this.createAdvertisement();
        this.clusterCommunicator.send(this.antiEntropyAdvertisementSubject, (Object)ad, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), peer).whenComplete((result, error) -> {
            if (error != null) {
                log.debug("Failed to send anti-entropy advertisement to {}: {}", (Object)peer, (Object)error.getMessage());
            } else if (result == AntiEntropyResponse.PROCESSED) {
                this.antiEntropyTimes.put(peer, adCreationTime);
            }
        });
    }

    private void sendUpdateRequestToPeer(MemberId peer, Set<String> keys) {
        UpdateRequest<String> request = new UpdateRequest<String>(this.localMemberId, keys);
        this.clusterCommunicator.unicast(this.updateRequestSubject, request, arg_0 -> ((Serializer)this.serializer).encode(arg_0), peer).whenComplete((result, error) -> {
            if (error != null) {
                log.debug("Failed to send update request to {}: {}", (Object)peer, (Object)error.getMessage());
            }
        });
    }

    private AntiEntropyAdvertisement createAdvertisement() {
        return new AntiEntropyAdvertisement(this.localMemberId, (Map<String, MapValue.Digest>)ImmutableMap.copyOf((Map)Maps.transformValues(this.items, MapValue::digest)));
    }

    private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement ad) {
        if (this.closed || this.underHighLoad()) {
            return AntiEntropyResponse.IGNORED;
        }
        try {
            if (log.isTraceEnabled()) {
                log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it", new Object[]{ad.sender(), this.mapName, ad.digest().size()});
            }
            this.antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
        }
        catch (Exception e) {
            log.warn("Error handling anti-entropy advertisement", (Throwable)e);
            return AntiEntropyResponse.FAILED;
        }
        return AntiEntropyResponse.PROCESSED;
    }

    private List<MapDelegateEvent<K, V>> antiEntropyCheckLocalItems(AntiEntropyAdvertisement ad) {
        LinkedList externalEvents = Lists.newLinkedList();
        MemberId sender = ad.sender();
        ImmutableList peers = ImmutableList.of((Object)sender);
        HashSet<String> staleOrMissing = new HashSet<String>();
        HashSet<String> locallyUnknown = new HashSet<String>(ad.digest().keySet());
        this.items.forEach((arg_0, arg_1) -> this.lambda$antiEntropyCheckLocalItems$17(locallyUnknown, ad, (List)peers, externalEvents, staleOrMissing, arg_0, arg_1));
        staleOrMissing.addAll(locallyUnknown);
        this.sendUpdateRequestToPeer(sender, staleOrMissing);
        return externalEvents;
    }

    private void handleUpdateRequests(UpdateRequest<String> request) {
        Set<String> keys = request.keys();
        MemberId sender = request.sender();
        ImmutableList peers = ImmutableList.of((Object)sender);
        keys.forEach(arg_0 -> this.lambda$handleUpdateRequests$18((List)peers, arg_0));
    }

    private void purgeTombstones() {
        long currentSafeTombstonePurgeTime = this.peersSupplier.get().stream().map(id -> this.antiEntropyTimes.getOrDefault(id, 0L)).reduce(Math::min).orElse(0L);
        if (currentSafeTombstonePurgeTime == this.previousTombstonePurgeTime) {
            return;
        }
        List<Map.Entry> tombStonesToDelete = this.items.entrySet().stream().filter(e -> ((MapValue)e.getValue()).isTombstone()).filter(e -> ((MapValue)e.getValue()).creationTime() <= currentSafeTombstonePurgeTime).collect(Collectors.toList());
        this.previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
        tombStonesToDelete.forEach(entry -> this.items.remove(entry.getKey(), entry.getValue()));
    }

    private void processUpdates(Collection<UpdateEntry> updates) {
        if (this.closed) {
            return;
        }
        updates.forEach(update -> {
            MapValue value;
            String key = update.key();
            MapValue mapValue = value = update.value() == null ? null : update.value().copy();
            if (value == null || value.isTombstone()) {
                MapValue previousValue = this.removeInternal(key, Optional.empty(), Optional.ofNullable(value));
                if (previousValue != null && previousValue.isAlive()) {
                    this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.REMOVE, this.decodeKey(key), previousValue.get(this::decodeValue)));
                }
            } else {
                this.counter.incrementCount();
                AtomicReference oldValue = new AtomicReference();
                AtomicBoolean updated = new AtomicBoolean(false);
                this.items.compute(key, (k, existing) -> {
                    if (existing == null || value.isNewerThan((MapValue)existing)) {
                        updated.set(true);
                        oldValue.set(existing != null ? existing.get() : null);
                        return value;
                    }
                    return existing;
                });
                if (updated.get()) {
                    if (oldValue.get() == null) {
                        this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.INSERT, this.decodeKey(key), this.decodeValue(value.get())));
                    } else {
                        this.notifyListeners(new MapDelegateEvent(MapDelegateEvent.Type.UPDATE, this.decodeKey(key), this.decodeValue(value.get())));
                    }
                }
            }
        });
    }

    private void bootstrap() {
        List<MemberId> activePeers = this.bootstrapPeersSupplier.get();
        if (activePeers.isEmpty()) {
            return;
        }
        try {
            this.requestBootstrapFromPeers(activePeers).get(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.debug("Failed to bootstrap ec map {}: {}", (Object)this.mapName, (Object)ExceptionUtils.getStackTrace((Throwable)e));
        }
    }

    private CompletableFuture<Void> requestBootstrapFromPeers(List<MemberId> peers) {
        if (peers.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        int totalPeers = peers.size();
        AtomicBoolean successful = new AtomicBoolean();
        AtomicInteger totalCount = new AtomicInteger();
        AtomicReference lastError = new AtomicReference();
        for (MemberId peer : peers) {
            this.requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
                if (error == null) {
                    Throwable e;
                    if (successful.compareAndSet(false, true)) {
                        future.complete(null);
                    } else if (totalCount.incrementAndGet() == totalPeers && (e = (Throwable)lastError.get()) != null) {
                        future.completeExceptionally(e);
                    }
                } else if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
                    future.completeExceptionally((Throwable)error);
                } else {
                    lastError.set(error);
                }
            });
        }
        return future;
    }

    private CompletableFuture<Void> requestBootstrapFromPeer(MemberId peer) {
        log.trace("Sending bootstrap request to {} for {}", (Object)peer, (Object)this.mapName);
        return this.clusterCommunicator.send(this.bootstrapMessageSubject, (Object)this.localMemberId, arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), peer).whenComplete((updates, error) -> {
            if (error != null) {
                log.debug("Bootstrap request to {} failed: {}", (Object)peer, (Object)error.getMessage());
            }
        });
    }

    private CompletableFuture<Void> handleBootstrap(MemberId peer) {
        log.trace("Received bootstrap request from {} for {}", (Object)peer, (Object)this.bootstrapMessageSubject);
        Function<List, CompletableFuture> sendUpdates = updates -> {
            log.trace("Initializing {} with {} entries", (Object)peer, (Object)updates.size());
            return this.clusterCommunicator.send(this.initializeMessageSubject, (Object)ImmutableList.copyOf((Collection)updates), arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), peer).whenComplete((result, error) -> {
                if (error != null) {
                    log.debug("Failed to initialize {}", (Object)peer, error);
                }
            });
        };
        ArrayList futures = Lists.newArrayList();
        ArrayList<UpdateEntry> updates2 = Lists.newArrayList();
        for (Map.Entry<String, MapValue> entry : this.items.entrySet()) {
            String key = entry.getKey();
            MapValue value = entry.getValue();
            if (!value.isAlive()) continue;
            updates2.add(new UpdateEntry(key, value));
            if (updates2.size() != 1000) continue;
            futures.add(sendUpdates.apply(updates2));
            updates2 = new ArrayList<UpdateEntry>();
        }
        if (!updates2.isEmpty()) {
            futures.add(sendUpdates.apply(updates2));
        }
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    }

    private /* synthetic */ void lambda$handleUpdateRequests$18(List peers, String key) {
        this.queueUpdate(new UpdateEntry(key, this.items.get(key)), peers);
    }

    private /* synthetic */ void lambda$antiEntropyCheckLocalItems$17(Set locallyUnknown, AntiEntropyAdvertisement ad, List peers, List externalEvents, Set staleOrMissing, String key, MapValue localValue) {
        locallyUnknown.remove(key);
        MapValue.Digest remoteValueDigest = ad.digest().get(key);
        if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
            this.queueUpdate(new UpdateEntry(key, localValue), peers);
        } else if (remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
            MapValue tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
            MapValue previousValue = this.removeInternal(key, Optional.empty(), Optional.of(tombstone));
            if (previousValue != null && previousValue.isAlive()) {
                externalEvents.add(new MapDelegateEvent(MapDelegateEvent.Type.REMOVE, this.decodeKey(key), previousValue.get(this::decodeValue)));
            }
        } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
            staleOrMissing.add(key);
        }
    }

    private final class EventAccumulator
    extends AbstractAccumulator<UpdateEntry> {
        private final MemberId peer;

        private EventAccumulator(MemberId peer) {
            super(TIMER, 1000, 50, 10);
            this.peer = peer;
        }

        public void processItems(List<UpdateEntry> items) {
            HashMap map = Maps.newHashMap();
            items.forEach(item -> map.compute(item.key(), (key, existing) -> item.isNewerThan((UpdateEntry)existing) ? item : existing));
            AntiEntropyMapDelegate.this.communicationExecutor.execute(() -> {
                try {
                    AntiEntropyMapDelegate.this.clusterCommunicator.unicast(AntiEntropyMapDelegate.this.updateMessageSubject, (Object)ImmutableList.copyOf(map.values()), arg_0 -> ((Serializer)AntiEntropyMapDelegate.this.serializer).encode(arg_0), this.peer).whenComplete((result, error) -> {
                        if (error != null) {
                            log.debug("Failed to send to {}", (Object)this.peer, error);
                        }
                    });
                }
                catch (Exception e) {
                    log.warn("Failed to send to {}", (Object)this.peer, (Object)e);
                }
            });
        }
    }
}

