/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.impl;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.BootstrapMetadataService;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.CoreMetadataService;
import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.impl.ClusterHeartbeat;
import io.atomix.cluster.impl.DefaultCoreMetadataService;
import io.atomix.cluster.impl.PhiAccrualFailureDetector;
import io.atomix.cluster.impl.StatefulNode;
import io.atomix.messaging.BroadcastService;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Namespace;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterService
extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
implements ManagedClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterService.class);
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
    private static final long DEFAULT_FAILURE_TIME = 10000L;
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
    private int heartbeatInterval = 100;
    private int phiFailureThreshold = 10;
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)KryoNamespace.builder().register(KryoNamespaces.BASIC).nextId(500).register(new Class[]{NodeId.class}).register(new Class[]{Node.Type.class}).register(new Class[]{Node.State.class}).register(new Class[]{ClusterHeartbeat.class}).register(new Class[]{StatefulNode.class}).register((Serializer)new DefaultCoreMetadataService.AddressSerializer(), new Class[]{Address.class}).build("ClusterService"));
    private final MessagingService messagingService;
    private final BroadcastService broadcastService;
    private final BootstrapMetadataService bootstrapMetadataService;
    private final CoreMetadataService coreMetadataService;
    private final AtomicBoolean started = new AtomicBoolean();
    private final StatefulNode localNode;
    private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap();
    private final Map<NodeId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final ClusterMetadataEventListener metadataEventListener = this::handleMetadataEvent;
    private final Consumer<byte[]> broadcastListener = this::handleBroadcastMessage;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-sender", (Logger)LOGGER));
    private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-receiver", (Logger)LOGGER));
    private ScheduledFuture<?> heartbeatFuture;

    public DefaultClusterService(Node localNode, BootstrapMetadataService bootstrapMetadataService, CoreMetadataService coreMetadataService, MessagingService messagingService, BroadcastService broadcastService) {
        this.bootstrapMetadataService = (BootstrapMetadataService)Preconditions.checkNotNull((Object)bootstrapMetadataService, (Object)"bootstrapMetadataService cannot be null");
        this.coreMetadataService = (CoreMetadataService)Preconditions.checkNotNull((Object)coreMetadataService, (Object)"coreMetadataService cannot be null");
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService, (Object)"messagingService cannot be null");
        this.broadcastService = (BroadcastService)Preconditions.checkNotNull((Object)broadcastService, (Object)"broadcastService cannot be null");
        this.localNode = new StatefulNode(localNode.id(), localNode.type(), localNode.address(), localNode.zone(), localNode.rack(), localNode.host());
    }

    @Override
    public Node getLocalNode() {
        return this.localNode;
    }

    @Override
    public Set<Node> getNodes() {
        return ImmutableSet.copyOf((Collection)this.nodes.values().stream().filter(node -> node.type() == Node.Type.CORE || node.getState() == Node.State.ACTIVE).collect(Collectors.toList()));
    }

    @Override
    public Node getNode(NodeId nodeId) {
        Node node = this.nodes.get(nodeId);
        return node != null && (node.type() == Node.Type.CORE || node.getState() == Node.State.ACTIVE) ? node : null;
    }

    private void broadcastIdentity() {
        this.broadcastService.broadcast(SERIALIZER.encode((Object)this.localNode));
    }

    private void handleBroadcastMessage(byte[] message) {
        StatefulNode node = (StatefulNode)SERIALIZER.decode(message);
        if (this.nodes.putIfAbsent(node.id(), node) == null) {
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
            this.sendHeartbeats();
        }
    }

    private CompletableFuture<Void> sendHeartbeats() {
        Stream<StatefulNode> clusterNodes = this.nodes.values().stream().filter(node -> !node.id().equals(this.getLocalNode().id()));
        Stream<StatefulNode> bootstrapNodes = this.bootstrapMetadataService.getMetadata().nodes().stream().filter(node -> !node.id().equals(this.getLocalNode().id()) && !this.nodes.containsKey(node.id())).map(node -> new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host()));
        byte[] payload = SERIALIZER.encode((Object)new ClusterHeartbeat(this.localNode.id(), this.localNode.type(), this.localNode.zone(), this.localNode.rack(), this.localNode.host()));
        return Futures.allOf(Stream.concat(clusterNodes, bootstrapNodes).map(node -> {
            LOGGER.trace("{} - Sending heartbeat: {}", (Object)this.localNode.id(), (Object)node.id());
            CompletableFuture<Void> future = this.sendHeartbeat(node.address(), payload);
            PhiAccrualFailureDetector failureDetector = this.failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
            double phi = failureDetector.phi();
            if (phi >= (double)this.phiFailureThreshold || phi == 0.0 && failureDetector.lastUpdated() > 0L && System.currentTimeMillis() - failureDetector.lastUpdated() > 10000L) {
                if (node.getState() == Node.State.ACTIVE) {
                    this.deactivateNode((Node)node);
                }
            } else if (node.getState() == Node.State.INACTIVE) {
                this.activateNode((Node)node);
            }
            return future.exceptionally(v -> null);
        }).collect(Collectors.toList())).thenApply(v -> null);
    }

    private CompletableFuture<Void> sendHeartbeat(Address address, byte[] payload) {
        return ((CompletableFuture)((CompletableFuture)this.messagingService.sendAndReceive(address, HEARTBEAT_MESSAGE, payload).whenComplete((response, error) -> {
            if (error == null) {
                Collection nodes = (Collection)SERIALIZER.decode(response);
                boolean sendHeartbeats = false;
                for (StatefulNode node : nodes) {
                    if (this.nodes.putIfAbsent(node.id(), node) != null) continue;
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
                    sendHeartbeats = true;
                }
                if (sendHeartbeats) {
                    this.sendHeartbeats();
                }
            } else {
                LOGGER.debug("{} - Sending heartbeat to {} failed", new Object[]{this.localNode.id(), address, error});
            }
        })).exceptionally(e -> null)).thenApply(v -> null);
    }

    private byte[] handleHeartbeat(Address address, byte[] message) {
        ClusterHeartbeat heartbeat = (ClusterHeartbeat)SERIALIZER.decode(message);
        LOGGER.trace("{} - Received heartbeat: {}", (Object)this.localNode.id(), (Object)heartbeat.nodeId());
        this.failureDetectors.computeIfAbsent(heartbeat.nodeId(), n -> new PhiAccrualFailureDetector()).report();
        this.activateNode(new StatefulNode(heartbeat.nodeId(), heartbeat.nodeType(), address, heartbeat.zone(), heartbeat.rack(), heartbeat.host()));
        return SERIALIZER.encode(this.nodes.values().stream().filter(node -> node.type() == Node.Type.CLIENT).collect(Collectors.toList()));
    }

    private void activateNode(Node node) {
        StatefulNode existingNode = this.nodes.get(node.id());
        if (existingNode == null) {
            StatefulNode statefulNode = new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host());
            LOGGER.info("{} - Node activated: {}", (Object)this.localNode.id(), (Object)statefulNode);
            statefulNode.setState(Node.State.ACTIVE);
            this.nodes.put(statefulNode.id(), statefulNode);
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
            this.sendHeartbeat(node.address(), SERIALIZER.encode((Object)new ClusterHeartbeat(this.localNode.id(), this.localNode.type(), this.localNode.zone(), this.localNode.rack(), this.localNode.host())));
        } else if (existingNode.getState() == Node.State.INACTIVE) {
            LOGGER.info("{} - Node activated: {}", (Object)this.localNode.id(), (Object)existingNode);
            existingNode.setState(Node.State.ACTIVE);
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, existingNode));
        }
    }

    private void deactivateNode(Node node) {
        StatefulNode existingNode = this.nodes.get(node.id());
        if (existingNode != null && existingNode.getState() == Node.State.ACTIVE) {
            LOGGER.info("{} - Node deactivated: {}", (Object)this.localNode.id(), (Object)existingNode);
            existingNode.setState(Node.State.INACTIVE);
            switch (existingNode.type()) {
                case CORE: {
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
                    break;
                }
                case DATA: 
                case CLIENT: {
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
                    this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
                    break;
                }
                default: {
                    throw new AssertionError();
                }
            }
        }
    }

    private void handleMetadataEvent(ClusterMetadataEvent event) {
        Set bootstrapNodes = ((ClusterMetadata)event.subject()).nodes().stream().map(node -> {
            StatefulNode existingNode = this.nodes.get(node.id());
            if (existingNode == null) {
                StatefulNode newNode = new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host());
                this.nodes.put(newNode.id(), newNode);
                this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_ADDED, newNode));
            }
            return node.id();
        }).collect(Collectors.toSet());
        Set dataNodes = this.nodes.entrySet().stream().filter(entry -> ((StatefulNode)entry.getValue()).type() == Node.Type.CORE).map(entry -> (NodeId)entry.getKey()).collect(Collectors.toSet());
        Sets.SetView missingNodes = Sets.difference(dataNodes, bootstrapNodes);
        for (NodeId nodeId : missingNodes) {
            StatefulNode existingNode = this.nodes.remove(nodeId);
            if (existingNode == null) continue;
            this.post((Event)new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
        }
    }

    public CompletableFuture<ClusterService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.coreMetadataService.addListener(this.metadataEventListener);
            this.broadcastService.addListener(this.broadcastListener);
            LOGGER.info("{} - Node activated: {}", (Object)this.localNode.id(), (Object)this.localNode);
            this.localNode.setState(Node.State.ACTIVE);
            this.nodes.put(this.localNode.id(), this.localNode);
            this.coreMetadataService.getMetadata().nodes().forEach(node -> this.nodes.putIfAbsent(node.id(), new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host())));
            this.messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, (Executor)this.heartbeatExecutor);
            ComposableFuture future = new ComposableFuture();
            this.broadcastIdentity();
            this.sendHeartbeats().whenComplete((r, e) -> future.complete(null));
            this.heartbeatFuture = this.heartbeatScheduler.scheduleWithFixedDelay(() -> {
                this.broadcastIdentity();
                this.sendHeartbeats();
            }, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            return future.thenApply(v -> {
                LOGGER.info("Started");
                return this;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatScheduler.shutdownNow();
            this.heartbeatExecutor.shutdownNow();
            LOGGER.info("{} - Node deactivated: {}", (Object)this.localNode.id(), (Object)this.localNode);
            this.localNode.setState(Node.State.INACTIVE);
            this.nodes.clear();
            this.heartbeatFuture.cancel(true);
            this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
            this.coreMetadataService.removeListener(this.metadataEventListener);
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }
}

