/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.messaging.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.ManagedClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.Subscription;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClockTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterEventService
implements ManagedClusterEventService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterEventService.class);
    private static final Serializer SERIALIZER = Serializer.using((Namespace)Namespace.builder().register(Namespaces.BASIC).register(new Class[]{MemberId.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{WallClockTimestamp.class}).register(new Class[]{InternalSubscriptionInfo.class}).register(new Class[]{InternalMessage.class}).register(new Class[]{InternalMessage.Type.class}).build());
    private static final String GOSSIP_MESSAGE_SUBJECT = "ClusterEventingService-update";
    private static final long GOSSIP_INTERVAL_MILLIS = 1000L;
    private static final long TOMBSTONE_EXPIRATION_MILLIS = 60000L;
    private final ClusterMembershipService membershipService;
    private final MessagingService messagingService;
    private final MemberId localMemberId;
    private final AtomicLong logicalTime = new AtomicLong();
    private ScheduledExecutorService gossipExecutor;
    private final Map<MemberId, Long> updateTimes = Maps.newConcurrentMap();
    private final Map<String, InternalTopic> topics = Maps.newConcurrentMap();
    private final AtomicBoolean started = new AtomicBoolean();

    public DefaultClusterEventService(ClusterMembershipService membershipService, MessagingService messagingService) {
        this.membershipService = membershipService;
        this.messagingService = messagingService;
        this.localMemberId = membershipService.getLocalMember().id();
    }

    @Override
    public <M> void broadcast(String topic, M message, Function<M, byte[]> encoder) {
        byte[] payload = SERIALIZER.encode((Object)new InternalMessage(InternalMessage.Type.ALL, encoder.apply(message)));
        this.getSubscriberNodes(topic).forEach(memberId -> {
            Member member = this.membershipService.getMember((MemberId)memberId);
            if (member != null && member.isReachable()) {
                this.messagingService.sendAsync(member.address(), topic, payload);
            }
        });
    }

    @Override
    public <M> CompletableFuture<Void> unicast(String topic, M message, Function<M, byte[]> encoder) {
        Member member;
        MemberId memberId = this.getNextMemberId(topic);
        if (memberId != null && (member = this.membershipService.getMember(memberId)) != null && member.isReachable()) {
            byte[] payload = SERIALIZER.encode((Object)new InternalMessage(InternalMessage.Type.DIRECT, encoder.apply(message)));
            return this.messagingService.sendAsync(member.address(), topic, payload);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public <M, R> CompletableFuture<R> send(String topic, M message, Function<M, byte[]> encoder, Function<byte[], R> decoder, Duration timeout) {
        Member member;
        MemberId memberId = this.getNextMemberId(topic);
        if (memberId != null && (member = this.membershipService.getMember(memberId)) != null && member.isReachable()) {
            byte[] payload = SERIALIZER.encode((Object)new InternalMessage(InternalMessage.Type.DIRECT, encoder.apply(message)));
            return this.messagingService.sendAndReceive(member.address(), topic, payload, timeout).thenApply(decoder);
        }
        return Futures.exceptionalFuture((Throwable)new MessagingException.NoRemoteHandler());
    }

    private Stream<MemberId> getSubscriberNodes(String topicName) {
        InternalTopic topic = this.topics.get(topicName);
        if (topic == null) {
            return Stream.empty();
        }
        return topic.remoteSubscriptions().stream().filter(s -> !s.isTombstone()).map(s -> s.memberId()).distinct();
    }

    private MemberId getNextMemberId(String topicName) {
        InternalTopic topic = this.topics.get(topicName);
        if (topic == null) {
            return null;
        }
        TopicIterator iterator = topic.iterator();
        if (iterator.hasNext()) {
            return iterator.next().memberId();
        }
        return null;
    }

    @Override
    public <M, R> CompletableFuture<Subscription> subscribe(String topic, Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
        return this.topics.computeIfAbsent(topic, t -> new InternalTopic(topic)).subscribe(decoder, handler, encoder, executor);
    }

    @Override
    public <M, R> CompletableFuture<Subscription> subscribe(String topic, Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
        return this.topics.computeIfAbsent(topic, t -> new InternalTopic(topic)).subscribe(decoder, handler, encoder);
    }

    @Override
    public <M> CompletableFuture<Subscription> subscribe(String topic, Function<byte[], M> decoder, Consumer<M> handler, Executor executor) {
        return this.topics.computeIfAbsent(topic, t -> new InternalTopic(topic)).subscribe(decoder, handler, executor);
    }

    @Override
    public List<Subscription> getSubscriptions(String topicName) {
        InternalTopic topic = this.topics.get(topicName);
        if (topic == null) {
            return ImmutableList.of();
        }
        return ImmutableList.copyOf(topic.localSubscriber().subscriptions());
    }

    private void update(Collection<InternalSubscriptionInfo> subscriptions) {
        for (InternalSubscriptionInfo subscription : subscriptions) {
            InternalTopic topic = this.topics.computeIfAbsent(subscription.topic, x$0 -> new InternalTopic((String)x$0));
            InternalSubscriptionInfo matchingSubscription = topic.remoteSubscriptions().stream().filter(s -> s.memberId().equals(subscription.memberId()) && s.logicalTimestamp().equals((Object)subscription.logicalTimestamp())).findFirst().orElse(null);
            if (matchingSubscription == null) {
                topic.addRemoteSubscription(subscription);
                continue;
            }
            if (!subscription.isTombstone()) continue;
            topic.removeRemoteSubscription(subscription);
        }
    }

    private void gossip() {
        List members = this.membershipService.getMembers().stream().filter(node -> !this.localMemberId.equals(node.id())).filter(node -> node.isReachable()).collect(Collectors.toList());
        if (!members.isEmpty()) {
            Collections.shuffle(members);
            Member member = (Member)members.get(0);
            this.updateNode(member);
        }
    }

    private CompletableFuture<Void> updateNodes() {
        List<CompletableFuture> futures = this.membershipService.getMembers().stream().filter(node -> !this.localMemberId.equals(node.id())).map(this::updateNode).collect(Collectors.toList());
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    }

    private CompletableFuture<Void> updateNode(Member member) {
        long updateTime = System.currentTimeMillis();
        long lastUpdateTime = this.updateTimes.getOrDefault(member.id(), 0L);
        Collection subscriptions = this.topics.values().stream().flatMap(t -> t.remoteSubscriptions().stream().filter(subscriber -> subscriber.timestamp().unixTimestamp() >= lastUpdateTime)).collect(Collectors.toList());
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.messagingService.sendAndReceive(member.address(), GOSSIP_MESSAGE_SUBJECT, SERIALIZER.encode((Object)subscriptions)).whenComplete((result, error) -> {
            if (error == null) {
                this.updateTimes.put(member.id(), updateTime);
            }
            future.complete(null);
        });
        return future;
    }

    private void purgeTombstones() {
        long minTombstoneTime = this.membershipService.getMembers().stream().map(node -> this.updateTimes.getOrDefault(node.id(), 0L)).reduce(Math::min).orElse(0L);
        for (InternalTopic topic : this.topics.values()) {
            topic.purgeTombstones(minTombstoneTime);
        }
    }

    public CompletableFuture<ClusterEventService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.gossipExecutor = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-event-executor-%d", (Logger)LOGGER));
            this.gossipExecutor.scheduleAtFixedRate(this::gossip, 1000L, 1000L, TimeUnit.MILLISECONDS);
            this.gossipExecutor.scheduleAtFixedRate(this::purgeTombstones, 60000L, 60000L, TimeUnit.MILLISECONDS);
            this.messagingService.registerHandler(GOSSIP_MESSAGE_SUBJECT, (address, payload) -> {
                this.update((Collection)SERIALIZER.decode(payload));
                return new byte[0];
            }, (Executor)this.gossipExecutor);
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(this);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            if (this.gossipExecutor != null) {
                this.gossipExecutor.shutdown();
            }
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }

    private static class InternalMessage {
        private final Type type;
        private final byte[] payload;

        InternalMessage(Type type, byte[] payload) {
            this.type = type;
            this.payload = payload;
        }

        public Type type() {
            return this.type;
        }

        public byte[] payload() {
            return this.payload;
        }

        private static enum Type {
            DIRECT,
            ALL;

        }
    }

    private class InternalTopic {
        private final String topic;
        private final InternalSubscriber subscribers = new InternalSubscriber();
        private final List<InternalSubscriptionInfo> subscriptions = Lists.newCopyOnWriteArrayList();
        private TopicIterator iterator;

        InternalTopic(String topic) {
            this.topic = topic;
        }

        InternalSubscriber localSubscriber() {
            return this.subscribers;
        }

        List<InternalSubscriptionInfo> remoteSubscriptions() {
            return this.subscriptions;
        }

        TopicIterator iterator() {
            return this.iterator;
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
            return this.addLocalSubscription(new InternalSubscription(this, payload -> {
                CompletableFuture future = new CompletableFuture();
                executor.execute(() -> {
                    try {
                        future.complete((byte[])encoder.apply(handler.apply(decoder.apply((byte[])payload))));
                    }
                    catch (Exception e) {
                        future.completeExceptionally(e);
                    }
                });
                return future;
            }));
        }

        <M, R> CompletableFuture<Subscription> subscribe(Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
            return this.addLocalSubscription(new InternalSubscription(this, payload -> ((CompletableFuture)handler.apply(decoder.apply((byte[])payload))).thenApply(encoder)));
        }

        <M> CompletableFuture<Subscription> subscribe(Function<byte[], M> decoder, Consumer<M> handler, Executor executor) {
            return this.addLocalSubscription(new InternalSubscription(this, payload -> {
                executor.execute(() -> {
                    try {
                        handler.accept(decoder.apply((byte[])payload));
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                });
                return CompletableFuture.completedFuture(null);
            }));
        }

        private synchronized CompletableFuture<Subscription> addLocalSubscription(InternalSubscription subscription) {
            this.subscribers.add(subscription);
            this.subscriptions.add(subscription.metadata);
            this.iterator = new TopicIterator(this.subscriptions);
            DefaultClusterEventService.this.messagingService.registerHandler(subscription.topic(), this.subscribers);
            return DefaultClusterEventService.this.updateNodes().thenApply(v -> subscription);
        }

        private synchronized CompletableFuture<Void> removeLocalSubscription(InternalSubscription subscription) {
            this.subscribers.remove(subscription);
            this.subscriptions.remove(subscription.metadata);
            this.subscriptions.add(subscription.metadata.asTombstone());
            this.iterator = new TopicIterator(this.subscriptions);
            if (this.subscriptions.stream().filter(s -> s.isTombstone()).count() == 0L) {
                DefaultClusterEventService.this.messagingService.unregisterHandler(subscription.topic());
            }
            return DefaultClusterEventService.this.updateNodes();
        }

        synchronized void addRemoteSubscription(InternalSubscriptionInfo subscription) {
            this.subscriptions.add(subscription);
            this.iterator = new TopicIterator(this.subscriptions);
        }

        synchronized void removeRemoteSubscription(InternalSubscriptionInfo subscription) {
            this.subscriptions.remove(subscription);
            this.subscriptions.add(subscription);
            this.iterator = new TopicIterator(this.subscriptions);
        }

        synchronized void purgeTombstones(long minTombstoneTime) {
            int startSize = this.subscriptions.size();
            this.subscriptions.removeIf(subscription -> subscription.isTombstone() && subscription.timestamp().unixTimestamp() < minTombstoneTime);
            if (this.subscriptions.size() != startSize) {
                this.iterator = new TopicIterator(this.subscriptions);
            }
        }
    }

    private static class TopicIterator
    implements Iterator<InternalSubscriptionInfo> {
        private final AtomicInteger counter = new AtomicInteger();
        private final InternalSubscriptionInfo[] subscribers;

        TopicIterator(List<InternalSubscriptionInfo> subscribers) {
            List<InternalSubscriptionInfo> filteredSubscribers = subscribers.stream().filter(s -> !s.isTombstone()).collect(Collectors.toList());
            Collections.reverse(filteredSubscribers);
            this.subscribers = filteredSubscribers.toArray(new InternalSubscriptionInfo[filteredSubscribers.size()]);
        }

        @Override
        public boolean hasNext() {
            return this.subscribers.length > 0;
        }

        @Override
        public InternalSubscriptionInfo next() {
            return this.subscribers[Math.abs(this.counter.incrementAndGet() % this.subscribers.length)];
        }
    }

    private static class InternalSubscriptionInfo {
        private final MemberId memberId;
        private final String topic;
        private final LogicalTimestamp logicalTimestamp;
        private final boolean tombstone;
        private final WallClockTimestamp timestamp = new WallClockTimestamp();

        InternalSubscriptionInfo(MemberId memberId, String topic, LogicalTimestamp logicalTimestamp) {
            this(memberId, topic, logicalTimestamp, false);
        }

        InternalSubscriptionInfo(MemberId memberId, String topic, LogicalTimestamp logicalTimestamp, boolean tombstone) {
            this.memberId = memberId;
            this.topic = topic;
            this.logicalTimestamp = logicalTimestamp;
            this.tombstone = tombstone;
        }

        MemberId memberId() {
            return this.memberId;
        }

        String topic() {
            return this.topic;
        }

        LogicalTimestamp logicalTimestamp() {
            return this.logicalTimestamp;
        }

        WallClockTimestamp timestamp() {
            return this.timestamp;
        }

        boolean isTombstone() {
            return this.tombstone;
        }

        InternalSubscriptionInfo asTombstone() {
            return new InternalSubscriptionInfo(this.memberId, this.topic, this.logicalTimestamp, true);
        }
    }

    private static class InternalSubscriber
    implements BiFunction<Address, byte[], CompletableFuture<byte[]>> {
        private final AtomicInteger counter = new AtomicInteger();
        private InternalSubscription[] subscriptions = new InternalSubscription[0];

        private InternalSubscriber() {
        }

        List<InternalSubscription> subscriptions() {
            return ImmutableList.copyOf((Object[])this.subscriptions);
        }

        private InternalSubscription next() {
            InternalSubscription[] subscriptions = this.subscriptions;
            return subscriptions[this.counter.incrementAndGet() % subscriptions.length];
        }

        @Override
        public CompletableFuture<byte[]> apply(Address address, byte[] payload) {
            InternalMessage message = (InternalMessage)SERIALIZER.decode(payload);
            switch (message.type()) {
                case DIRECT: {
                    InternalSubscription subscription = this.next();
                    return (CompletableFuture)subscription.callback.apply(message.payload());
                }
            }
            for (InternalSubscription s : this.subscriptions) {
                s.callback.apply(message.payload());
            }
            return CompletableFuture.completedFuture(null);
        }

        void add(InternalSubscription subscription) {
            ArrayList<InternalSubscription> subscriptions = new ArrayList<InternalSubscription>(this.subscriptions.length + 1);
            subscriptions.addAll(Arrays.asList(this.subscriptions));
            subscriptions.add(subscription);
            this.subscriptions = subscriptions.toArray(new InternalSubscription[subscriptions.size()]);
        }

        void remove(InternalSubscription subscription) {
            ArrayList subscriptions = Lists.newArrayList((Object[])this.subscriptions);
            subscriptions.remove(subscription);
            this.subscriptions = subscriptions.toArray(new InternalSubscription[subscriptions.size()]);
        }
    }

    private class InternalSubscription
    implements Subscription {
        private final InternalTopic topic;
        private final InternalSubscriptionInfo metadata;
        private final Function<byte[], CompletableFuture<byte[]>> callback;

        InternalSubscription(InternalTopic topic, Function<byte[], CompletableFuture<byte[]>> callback) {
            this.topic = topic;
            this.metadata = new InternalSubscriptionInfo(DefaultClusterEventService.this.localMemberId, topic.topic, new LogicalTimestamp(DefaultClusterEventService.this.logicalTime.incrementAndGet()));
            this.callback = callback;
        }

        @Override
        public String topic() {
            return this.metadata.topic();
        }

        @Override
        public CompletableFuture<Void> close() {
            return this.topic.removeLocalSubscription(this);
        }
    }
}

