/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.gossip;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.event.AbstractListenerManager;
import io.atomix.event.Event;
import io.atomix.protocols.gossip.GossipEvent;
import io.atomix.protocols.gossip.GossipEventListener;
import io.atomix.protocols.gossip.GossipService;
import io.atomix.protocols.gossip.protocol.AntiEntropyAdvertisement;
import io.atomix.protocols.gossip.protocol.AntiEntropyProtocol;
import io.atomix.protocols.gossip.protocol.AntiEntropyResponse;
import io.atomix.protocols.gossip.protocol.GossipMessage;
import io.atomix.protocols.gossip.protocol.GossipUpdate;
import io.atomix.time.LogicalClock;
import io.atomix.time.Timestamp;
import io.atomix.utils.AbstractAccumulator;
import io.atomix.utils.Identifier;
import io.atomix.utils.SlidingWindowCounter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AntiEntropyService<K, V>
extends AbstractListenerManager<GossipEvent<K, V>, GossipEventListener<K, V>>
implements GossipService<K, V> {
    private static final int WINDOW_SIZE = 5;
    private static final int HIGH_LOAD_THRESHOLD = 2;
    private static final int LOAD_WINDOW = 2;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final AntiEntropyProtocol<Identifier> protocol;
    private final Supplier<Collection<Identifier>> peerProvider;
    private final Executor eventExecutor;
    private final Executor communicationExecutor;
    private final boolean tombstonesDisabled;
    private final ScheduledFuture<?> updateFuture;
    private final ScheduledFuture<?> purgeFuture;
    private final Map<K, GossipUpdate<K, V>> updates = Maps.newLinkedHashMap();
    private final LogicalClock logicalClock = new LogicalClock();
    private final Map<Identifier, UpdateAccumulator> pendingUpdates = Maps.newConcurrentMap();
    private final Map<Identifier, Long> peerUpdateTimes = Maps.newConcurrentMap();
    private volatile boolean open = true;
    private final SlidingWindowCounter counter = new SlidingWindowCounter(5);
    private static final int DEFAULT_MAX_EVENTS = 1000;
    private static final int DEFAULT_MAX_IDLE_MS = 10;
    private static final int DEFAULT_MAX_BATCH_MS = 50;
    private static final Timer TIMER = new Timer("onos-ecm-sender-events");

    public AntiEntropyService(AntiEntropyProtocol<Identifier> protocol, Supplier<Collection<Identifier>> peerProvider, Executor eventExecutor, ScheduledExecutorService communicationExecutor, Duration antiEntropyInterval, boolean tombstonesDisabled, Duration purgeInterval) {
        this.protocol = (AntiEntropyProtocol)Preconditions.checkNotNull(protocol, (Object)"protocol cannot be null");
        this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
        this.eventExecutor = (Executor)Preconditions.checkNotNull((Object)eventExecutor, (Object)"eventExecutor cannot be null");
        this.communicationExecutor = (Executor)Preconditions.checkNotNull((Object)communicationExecutor, (Object)"communicationExecutor cannot be null");
        this.tombstonesDisabled = tombstonesDisabled;
        protocol.registerGossipListener(this::update);
        this.updateFuture = communicationExecutor.scheduleAtFixedRate(this::performAntiEntropy, 0L, antiEntropyInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0L, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null;
    }

    protected void post(GossipEvent<K, V> event) {
        this.eventExecutor.execute(() -> super.post((Event)event));
    }

    public void process(GossipEvent<K, V> event) {
        GossipUpdate<K, V> update = new GossipUpdate<K, V>(event.subject(), event.value(), (Timestamp)this.logicalClock.increment());
        if (update.isTombstone()) {
            if (this.tombstonesDisabled) {
                this.updates.remove(update.subject());
            } else {
                this.updates.put(event.subject(), update);
                this.notifyPeers(update);
            }
        } else {
            this.updates.put(event.subject(), update);
            this.notifyPeers(update);
        }
        this.post(event);
    }

    private synchronized void update(GossipMessage<K, V> message) {
        this.logicalClock.update(message.timestamp());
        for (GossipUpdate<K, V> update : message.updates()) {
            GossipUpdate<K, V> existingUpdate = this.updates.get(update.subject());
            if (existingUpdate != null && (!existingUpdate.isTombstone() || update.isTombstone()) && !existingUpdate.timestamp().isOlderThan(update.timestamp())) continue;
            if (!this.tombstonesDisabled) {
                this.updates.put(update.subject(), update);
            }
            this.post(new GossipEvent<K, V>(update.creationTime(), update.subject(), update.value()));
        }
    }

    private UpdateAccumulator getAccumulator(Identifier peer) {
        return this.pendingUpdates.computeIfAbsent(peer, x$0 -> new UpdateAccumulator((Identifier)x$0));
    }

    private boolean underHighLoad() {
        return this.counter.get(2) > 2L;
    }

    private void performAntiEntropy() {
        try {
            if (this.underHighLoad() || !this.open) {
                return;
            }
            this.pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
        }
        catch (Exception e) {
            this.log.error("Exception thrown while sending advertisement", (Throwable)e);
        }
    }

    private Optional<Identifier> pickRandomActivePeer() {
        ArrayList peers = Lists.newArrayList((Iterable)this.peerProvider.get());
        Collections.shuffle(peers);
        return peers.isEmpty() ? Optional.empty() : Optional.of(peers.get(0));
    }

    private void sendAdvertisementToPeer(Identifier peer) {
        long updateTime = System.currentTimeMillis();
        AntiEntropyAdvertisement advertisement = new AntiEntropyAdvertisement(ImmutableMap.copyOf((Map)Maps.transformValues(this.updates, GossipUpdate::digest)));
        this.protocol.advertise(peer, advertisement).whenComplete((response, error) -> {
            if (error != null) {
                this.log.debug("Failed to send anti-entropy advertisement to {}: {}", (Object)peer, (Object)error.getMessage());
            } else if (response.status() == AntiEntropyResponse.Status.PROCESSED) {
                if (!response.keys().isEmpty()) {
                    UpdateAccumulator accumulator = this.getAccumulator(peer);
                    for (Object key : response.keys()) {
                        GossipUpdate<K, V> update = this.updates.get(key);
                        if (update == null) continue;
                        accumulator.add(update);
                    }
                }
                this.peerUpdateTimes.put(peer, updateTime);
            }
        });
    }

    private void notifyPeers(GossipUpdate<K, V> event) {
        this.notifyPeers(event, this.peerProvider.get());
    }

    private void notifyPeers(GossipUpdate<K, V> event, Collection<Identifier> peers) {
        this.queueUpdate(event, peers);
    }

    private void queueUpdate(GossipUpdate<K, V> event, Collection<Identifier> peers) {
        if (peers != null) {
            for (Identifier peer : peers) {
                this.getAccumulator(peer).add(event);
            }
        }
    }

    private synchronized void purgeTombstones() {
        long minTombstoneTime = this.peerProvider.get().stream().map(peer -> this.peerUpdateTimes.getOrDefault(peer, 0L)).reduce(Math::min).orElse(0L);
        Iterator<Map.Entry<K, GossipUpdate<K, V>>> iterator = this.updates.entrySet().iterator();
        while (iterator.hasNext()) {
            GossipUpdate<K, V> update = iterator.next().getValue();
            if (!update.isTombstone() || update.creationTime() >= minTombstoneTime) continue;
            iterator.remove();
        }
    }

    @Override
    public void close() {
        this.open = false;
        this.protocol.unregisterGossipListener();
        this.updateFuture.cancel(false);
        if (this.purgeFuture != null) {
            this.purgeFuture.cancel(false);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("protocol", this.protocol).toString();
    }

    public static class Builder<K, V>
    implements GossipService.Builder<K, V> {
        protected AntiEntropyProtocol protocol;
        protected Supplier<Collection<Identifier>> peerProvider;
        protected Executor eventExecutor = MoreExecutors.directExecutor();
        protected ScheduledExecutorService communicationExecutor;
        protected Duration antiEntropyInterval = Duration.ofSeconds(1L);
        protected boolean tombstonesDisabled = false;
        protected Duration purgeInterval = Duration.ofMinutes(1L);

        public Builder<K, V> withProtocol(AntiEntropyProtocol protocol) {
            this.protocol = (AntiEntropyProtocol)Preconditions.checkNotNull((Object)protocol, (Object)"protocol");
            return this;
        }

        public Builder<K, V> withPeerProvider(Supplier<Collection<Identifier>> peerProvider) {
            this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
            return this;
        }

        public Builder<K, V> withEventExecutor(Executor executor) {
            this.eventExecutor = (Executor)Preconditions.checkNotNull((Object)executor, (Object)"executor cannot be null");
            return this;
        }

        public Builder<K, V> withCommunicationExecutor(ScheduledExecutorService executor) {
            this.communicationExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)executor, (Object)"executor cannot be null");
            return this;
        }

        public Builder<K, V> withAntiEntropyInterval(Duration antiEntropyInterval) {
            this.antiEntropyInterval = (Duration)Preconditions.checkNotNull((Object)antiEntropyInterval, (Object)"antiEntropyInterval cannot be null");
            return this;
        }

        public Builder<K, V> withTombstonesDisabled(boolean tombstonesDisabled) {
            this.tombstonesDisabled = tombstonesDisabled;
            return this;
        }

        public Builder<K, V> withPurgeInterval(Duration purgeInterval) {
            this.purgeInterval = (Duration)Preconditions.checkNotNull((Object)purgeInterval, (Object)"purgeInterval cannot be null");
            return this;
        }

        public GossipService<K, V> build() {
            return new AntiEntropyService(this.protocol, this.peerProvider, this.eventExecutor, this.communicationExecutor, this.antiEntropyInterval, this.tombstonesDisabled, this.purgeInterval);
        }
    }

    private final class UpdateAccumulator
    extends AbstractAccumulator<GossipUpdate<K, V>> {
        private final Identifier peer;

        private UpdateAccumulator(Identifier peer) {
            super(TIMER, 1000, 50, 10);
            this.peer = peer;
        }

        public void processItems(List<GossipUpdate<K, V>> items) {
            HashMap map = Maps.newHashMap();
            items.forEach(item -> map.compute(item.subject(), (key, existing) -> item.timestamp().isNewerThan(existing.timestamp()) ? item : existing));
            AntiEntropyService.this.communicationExecutor.execute(() -> {
                try {
                    AntiEntropyService.this.protocol.gossip(this.peer, new GossipMessage(AntiEntropyService.this.logicalClock.increment(), map.values()));
                }
                catch (Exception e) {
                    AntiEntropyService.this.log.warn("Failed to send to {}", (Object)this.peer, (Object)e);
                }
            });
        }
    }
}

