/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.protocols.gossip;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.event.AbstractListenerManager;
import io.atomix.event.Event;
import io.atomix.protocols.gossip.GossipEvent;
import io.atomix.protocols.gossip.GossipEventListener;
import io.atomix.protocols.gossip.GossipService;
import io.atomix.protocols.gossip.protocol.GossipMessage;
import io.atomix.protocols.gossip.protocol.GossipProtocol;
import io.atomix.protocols.gossip.protocol.GossipUpdate;
import io.atomix.time.LogicalClock;
import io.atomix.time.LogicalTimestamp;
import io.atomix.time.Timestamp;
import io.atomix.utils.Identifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class DisseminationService<K, V>
extends AbstractListenerManager<GossipEvent<K, V>, GossipEventListener<K, V>>
implements GossipService<K, V> {
    private final GossipProtocol protocol;
    private final Supplier<Collection<Identifier>> peerProvider;
    private final Executor eventExecutor;
    private final boolean fastConvergence;
    private final boolean tombstonesDisabled;
    private final ScheduledFuture<?> updateFuture;
    private final ScheduledFuture<?> purgeFuture;
    private final Map<K, GossipUpdate<K, V>> updates = Maps.newLinkedHashMap();
    private final LogicalClock logicalClock = new LogicalClock();
    private final Map<Identifier, Long> peerUpdateTimes = Maps.newConcurrentMap();
    private final Map<Identifier, LogicalTimestamp> peerTimestamps = Maps.newHashMap();

    public static <K, V> Builder<K, V> builder() {
        return new Builder();
    }

    public DisseminationService(GossipProtocol<?> protocol, Supplier<Collection<Identifier>> peerProvider, Executor eventExecutor, ScheduledExecutorService communicationExecutor, Duration updateInterval, boolean fastConvergence, boolean tombstonesDisabled, Duration purgeInterval) {
        this.protocol = (GossipProtocol)Preconditions.checkNotNull(protocol, (Object)"protocol cannot be null");
        this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
        this.eventExecutor = (Executor)Preconditions.checkNotNull((Object)eventExecutor, (Object)"eventExecutor cannot be null");
        this.fastConvergence = fastConvergence;
        this.tombstonesDisabled = tombstonesDisabled;
        protocol.registerGossipListener(this::update);
        this.updateFuture = communicationExecutor.scheduleAtFixedRate(this::gossip, 0L, updateInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0L, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null;
    }

    protected void post(GossipEvent<K, V> event) {
        this.eventExecutor.execute(() -> super.post((Event)event));
    }

    public void process(GossipEvent<K, V> event) {
        LogicalTimestamp timestamp = this.logicalClock.increment();
        GossipUpdate<K, V> update = new GossipUpdate<K, V>(event.subject(), event.value(), (Timestamp)timestamp.asVersion());
        if (event.value() != null) {
            this.updates.put(event.subject(), update);
            if (this.fastConvergence) {
                this.updatePeers();
            }
        } else if (this.tombstonesDisabled) {
            this.updates.remove(event.subject());
        } else {
            this.updates.put(event.subject(), update);
            if (this.fastConvergence) {
                this.updatePeers();
            }
        }
        this.post(event);
    }

    private synchronized void update(GossipMessage<K, V> message) {
        this.logicalClock.update(message.timestamp());
        for (GossipUpdate<K, V> update : message.updates()) {
            GossipUpdate<K, V> existingUpdate = this.updates.get(update.subject());
            if (existingUpdate != null && (!existingUpdate.isTombstone() || update.isTombstone()) && !existingUpdate.timestamp().isOlderThan(update.timestamp())) continue;
            if (!this.tombstonesDisabled) {
                this.updates.put(update.subject(), update);
            }
            this.post(new GossipEvent<K, V>(update.creationTime(), update.subject(), update.value()));
        }
    }

    private synchronized void gossip() {
        ArrayList peers = Lists.newArrayList((Iterable)this.peerProvider.get());
        if (!peers.isEmpty()) {
            Collections.shuffle(peers);
            Identifier peer = (Identifier)peers.get(0);
            this.updatePeer(peer);
        }
    }

    private void updatePeers() {
        for (Identifier peer : this.peerProvider.get()) {
            this.updatePeer(peer);
        }
    }

    private synchronized void updatePeer(Identifier peer) {
        LogicalTimestamp updateTimestamp = this.logicalClock.increment();
        long updateTime = System.currentTimeMillis();
        LogicalTimestamp lastUpdate = this.peerTimestamps.computeIfAbsent(peer, n -> new LogicalTimestamp(0L));
        Collection filteredUpdates = this.updates.values().stream().filter(update -> update.timestamp().isNewerThan((Timestamp)lastUpdate)).collect(Collectors.toList());
        this.protocol.gossip(peer, new GossipMessage(updateTimestamp, filteredUpdates));
        this.peerTimestamps.put(peer, updateTimestamp);
        this.peerUpdateTimes.put(peer, updateTime);
    }

    private synchronized void purgeTombstones() {
        long minTombstoneTime = this.peerProvider.get().stream().map(peer -> this.peerUpdateTimes.getOrDefault(peer, 0L)).reduce(Math::min).orElse(0L);
        Iterator<Map.Entry<K, GossipUpdate<K, V>>> iterator = this.updates.entrySet().iterator();
        while (iterator.hasNext()) {
            GossipUpdate<K, V> update = iterator.next().getValue();
            if (!update.isTombstone() || update.creationTime() >= minTombstoneTime) continue;
            iterator.remove();
        }
    }

    @Override
    public void close() {
        this.protocol.unregisterGossipListener();
        this.updateFuture.cancel(false);
        if (this.purgeFuture != null) {
            this.purgeFuture.cancel(false);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("protocol", (Object)this.protocol).toString();
    }

    public static class Builder<K, V>
    implements GossipService.Builder<K, V> {
        protected GossipProtocol protocol;
        protected Supplier<Collection<Identifier>> peerProvider;
        protected Executor eventExecutor = MoreExecutors.directExecutor();
        protected ScheduledExecutorService communicationExecutor;
        protected Duration updateInterval = Duration.ofSeconds(1L);
        protected boolean fastConvergence = false;
        protected boolean tombstonesDisabled = false;
        protected Duration purgeInterval = Duration.ofMinutes(1L);

        public Builder<K, V> withProtocol(GossipProtocol protocol) {
            this.protocol = (GossipProtocol)Preconditions.checkNotNull((Object)protocol, (Object)"protocol");
            return this;
        }

        public Builder<K, V> withPeerProvider(Supplier<Collection<Identifier>> peerProvider) {
            this.peerProvider = (Supplier)Preconditions.checkNotNull(peerProvider, (Object)"peerProvider cannot be null");
            return this;
        }

        public Builder<K, V> withEventExecutor(Executor executor) {
            this.eventExecutor = (Executor)Preconditions.checkNotNull((Object)executor, (Object)"executor cannot be null");
            return this;
        }

        public Builder<K, V> withCommunicationExecutor(ScheduledExecutorService executor) {
            this.communicationExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)executor, (Object)"executor cannot be null");
            return this;
        }

        public Builder<K, V> withUpdateInterval(Duration updateInterval) {
            this.updateInterval = (Duration)Preconditions.checkNotNull((Object)updateInterval, (Object)"updateInterval cannot be null");
            return this;
        }

        public Builder<K, V> withFastConvergence(boolean fastConvergence) {
            this.fastConvergence = fastConvergence;
            return this;
        }

        public Builder<K, V> withTombstonesDisabled(boolean tombstonesDisabled) {
            this.tombstonesDisabled = tombstonesDisabled;
            return this;
        }

        public Builder<K, V> withPurgeInterval(Duration purgeInterval) {
            this.purgeInterval = (Duration)Preconditions.checkNotNull((Object)purgeInterval, (Object)"purgeInterval cannot be null");
            return this;
        }

        public GossipService<K, V> build() {
            return new DisseminationService(this.protocol, this.peerProvider, this.eventExecutor, this.communicationExecutor, this.updateInterval, this.fastConvergence, this.tombstonesDisabled, this.purgeInterval);
        }
    }
}

