/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.protocol;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.NodeDiscoveryEvent;
import io.atomix.cluster.discovery.NodeDiscoveryEventListener;
import io.atomix.cluster.discovery.NodeDiscoveryService;
import io.atomix.cluster.impl.AddressSerializer;
import io.atomix.cluster.impl.PhiAccrualFailureDetector;
import io.atomix.cluster.protocol.GroupMembershipEvent;
import io.atomix.cluster.protocol.GroupMembershipEventListener;
import io.atomix.cluster.protocol.GroupMembershipProtocol;
import io.atomix.cluster.protocol.GroupMembershipProtocolConfig;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocolBuilder;
import io.atomix.cluster.protocol.HeartbeatMembershipProtocolConfig;
import io.atomix.utils.Version;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatMembershipProtocol
extends AbstractListenerManager<GroupMembershipEvent, GroupMembershipEventListener>
implements GroupMembershipProtocol {
    public static final Type TYPE = new Type();
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatMembershipProtocol.class);
    private final HeartbeatMembershipProtocolConfig config;
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-membership";
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)Namespace.builder().register(Namespaces.BASIC).nextId(500).register(new Class[]{MemberId.class}).register(new Class[]{GossipMember.class}).register((Serializer)new AddressSerializer(), new Class[]{Address.class}).build("ClusterMembershipService"));
    private volatile NodeDiscoveryService discoveryService;
    private volatile BootstrapService bootstrapService;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile GossipMember localMember;
    private volatile Properties localProperties = new Properties();
    private final Map<MemberId, GossipMember> members = Maps.newConcurrentMap();
    private final Map<MemberId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final NodeDiscoveryEventListener discoveryEventListener = this::handleDiscoveryEvent;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-sender", (Logger)LOGGER));
    private final ExecutorService eventExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)"atomix-cluster-events", (Logger)LOGGER));
    private ScheduledFuture<?> heartbeatFuture;

    public static HeartbeatMembershipProtocolBuilder builder() {
        return new HeartbeatMembershipProtocolBuilder();
    }

    public HeartbeatMembershipProtocol(HeartbeatMembershipProtocolConfig config) {
        this.config = config;
    }

    public GroupMembershipProtocolConfig config() {
        return this.config;
    }

    @Override
    public Set<Member> getMembers() {
        return ImmutableSet.copyOf(this.members.values());
    }

    @Override
    public Member getMember(MemberId memberId) {
        return this.members.get(memberId);
    }

    protected void post(GroupMembershipEvent event) {
        this.eventExecutor.execute(() -> super.post((Event)event));
    }

    private void handleDiscoveryEvent(NodeDiscoveryEvent event) {
        switch ((NodeDiscoveryEvent.Type)event.type()) {
            case JOIN: {
                this.handleJoinEvent((Node)event.subject());
                break;
            }
            case LEAVE: {
                this.handleLeaveEvent((Node)event.subject());
                break;
            }
            default: {
                throw new AssertionError();
            }
        }
    }

    private void handleJoinEvent(Node node) {
        GossipMember member = new GossipMember(MemberId.from((String)((Object)node.id().id())), node.address());
        if (!this.members.containsKey(member.id())) {
            this.sendHeartbeat(member);
        }
    }

    private void handleLeaveEvent(Node node) {
        this.members.compute(MemberId.from((String)((Object)node.id().id())), (id, member) -> member == null || !member.isActive() ? null : member);
    }

    private CompletableFuture<Void> sendHeartbeats() {
        this.checkMetadata();
        Stream<GossipMember> clusterMembers = this.members.values().stream().filter(member -> !member.id().equals(this.localMember.id()));
        Stream<GossipMember> providerMembers = this.discoveryService.getNodes().stream().filter(node -> !this.members.containsKey(MemberId.from((String)((Object)node.id().id())))).map(node -> new GossipMember(MemberId.from((String)((Object)node.id().id())), node.address()));
        return Futures.allOf(Stream.concat(clusterMembers, providerMembers).map(member -> {
            LOGGER.trace("{} - Sending heartbeat: {}", (Object)this.localMember.id(), member);
            return this.sendHeartbeat((GossipMember)member).exceptionally(v -> null);
        }).collect(Collectors.toList())).thenApply(v -> null);
    }

    private void checkMetadata() {
        if (!this.localMember.properties().equals(this.localProperties)) {
            this.localProperties = new Properties();
            this.localProperties.putAll((Map<?, ?>)this.localMember.properties());
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, this.localMember));
        }
    }

    private CompletableFuture<Void> sendHeartbeat(GossipMember member) {
        return ((CompletableFuture)((CompletableFuture)this.bootstrapService.getMessagingService().sendAndReceive(member.address(), HEARTBEAT_MESSAGE, SERIALIZER.encode((Object)this.localMember)).whenCompleteAsync((response, error) -> {
            if (error == null) {
                Collection remoteMembers = (Collection)SERIALIZER.decode(response);
                for (GossipMember remoteMember : remoteMembers) {
                    if (remoteMember.id().equals(this.localMember.id())) continue;
                    this.updateMember(remoteMember, remoteMember.id().equals(member.id()));
                }
            } else {
                PhiAccrualFailureDetector failureDetector;
                double phi;
                LOGGER.debug("{} - Sending heartbeat to {} failed", new Object[]{this.localMember.id(), member, error});
                if (member.isReachable()) {
                    member.setReachable(false);
                    this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, member));
                }
                if (((phi = (failureDetector = this.failureDetectors.computeIfAbsent(member.id(), n -> new PhiAccrualFailureDetector())).phi()) >= (double)this.config.getPhiFailureThreshold() || phi == 0.0 && System.currentTimeMillis() - failureDetector.lastUpdated() > this.config.getFailureTimeout().toMillis()) && this.members.remove(member.id()) != null) {
                    this.failureDetectors.remove(member.id());
                    this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, member));
                }
            }
        }, (Executor)this.heartbeatScheduler)).exceptionally(e -> null)).thenApply(v -> null);
    }

    private byte[] handleHeartbeat(Address address, byte[] message) {
        GossipMember remoteMember = (GossipMember)SERIALIZER.decode(message);
        LOGGER.trace("{} - Received heartbeat: {}", (Object)this.localMember.id(), (Object)remoteMember);
        this.failureDetectors.computeIfAbsent(remoteMember.id(), n -> new PhiAccrualFailureDetector()).report();
        this.updateMember(remoteMember, true);
        return SERIALIZER.encode((Object)Lists.newArrayList((Iterable)this.members.values().stream().filter(member -> member.isReachable()).collect(Collectors.toList())));
    }

    private void updateMember(GossipMember remoteMember, boolean direct) {
        GossipMember localMember = this.members.get(remoteMember.id());
        if (localMember == null) {
            remoteMember.setActive(true);
            remoteMember.setReachable(true);
            this.members.put(remoteMember.id(), remoteMember);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, remoteMember));
        } else if (!Objects.equals(localMember.version(), remoteMember.version())) {
            this.members.remove(localMember.id());
            localMember.setReachable(false);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, localMember));
            localMember.setActive(false);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, localMember));
            this.members.put(remoteMember.id(), remoteMember);
            remoteMember.setActive(true);
            remoteMember.setReachable(true);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, remoteMember));
        } else if (!Objects.equals(localMember.properties(), remoteMember.properties())) {
            if (!localMember.isReachable()) {
                localMember.setReachable(true);
                this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, localMember));
            }
            localMember.properties().putAll((Map<?, ?>)remoteMember.properties());
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.METADATA_CHANGED, localMember));
        } else if (!localMember.isReachable() && direct) {
            localMember.setReachable(true);
            localMember.setTerm(localMember.getTerm() + 1L);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, localMember));
        } else if (!localMember.isReachable() && remoteMember.getTerm() > localMember.getTerm()) {
            localMember.setReachable(true);
            localMember.setTerm(remoteMember.getTerm());
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.REACHABILITY_CHANGED, localMember));
        }
    }

    @Override
    public CompletableFuture<Void> join(BootstrapService bootstrap, NodeDiscoveryService discovery, Member member) {
        if (this.started.compareAndSet(false, true)) {
            this.bootstrapService = bootstrap;
            this.discoveryService = discovery;
            this.localMember = new GossipMember(member.id(), member.address(), member.zone(), member.rack(), member.host(), member.properties(), member.version(), System.currentTimeMillis());
            this.discoveryService.addListener(this.discoveryEventListener);
            LOGGER.info("{} - Member activated: {}", (Object)this.localMember.id(), (Object)this.localMember);
            this.localMember.setActive(true);
            this.localMember.setReachable(true);
            this.members.put(this.localMember.id(), this.localMember);
            this.post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_ADDED, this.localMember));
            this.bootstrapService.getMessagingService().registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, (Executor)this.heartbeatScheduler);
            this.heartbeatFuture = this.heartbeatScheduler.scheduleAtFixedRate(this::sendHeartbeats, 0L, this.config.getHeartbeatInterval().toMillis(), TimeUnit.MILLISECONDS);
            LOGGER.info("Started");
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> leave(Member member) {
        if (this.started.compareAndSet(true, false)) {
            this.discoveryService.removeListener(this.discoveryEventListener);
            this.heartbeatFuture.cancel(true);
            this.heartbeatScheduler.shutdownNow();
            this.eventExecutor.shutdownNow();
            LOGGER.info("{} - Member deactivated: {}", (Object)this.localMember.id(), (Object)this.localMember);
            this.localMember.setActive(false);
            this.localMember.setReachable(false);
            this.members.clear();
            this.bootstrapService.getMessagingService().unregisterHandler(HEARTBEAT_MESSAGE);
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }

    private static class GossipMember
    extends Member {
        private final Version version;
        private final long timestamp;
        private volatile boolean active;
        private volatile boolean reachable;
        private volatile long term;

        GossipMember(MemberId id, Address address) {
            super(id, address);
            this.version = null;
            this.timestamp = 0L;
        }

        GossipMember(MemberId id, Address address, String zone, String rack, String host, Properties properties, Version version, long timestamp) {
            super(id, address, zone, rack, host, properties);
            this.version = version;
            this.timestamp = timestamp;
        }

        @Override
        public Version version() {
            return this.version;
        }

        @Override
        public long timestamp() {
            return this.timestamp;
        }

        void setActive(boolean active) {
            this.active = active;
        }

        void setReachable(boolean reachable) {
            this.reachable = reachable;
        }

        @Override
        public boolean isActive() {
            return this.active;
        }

        @Override
        public boolean isReachable() {
            return this.reachable;
        }

        long getTerm() {
            return this.term;
        }

        void setTerm(long term) {
            this.term = term;
        }
    }

    public static class Type
    implements GroupMembershipProtocol.Type<HeartbeatMembershipProtocolConfig> {
        private static final String NAME = "heartbeat";

        public String name() {
            return NAME;
        }

        public HeartbeatMembershipProtocolConfig newConfig() {
            return new HeartbeatMembershipProtocolConfig();
        }

        @Override
        public GroupMembershipProtocol newProtocol(HeartbeatMembershipProtocolConfig config) {
            return new HeartbeatMembershipProtocol(config);
        }
    }
}

