/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.cluster.impl;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.GroupMembershipConfig;
import io.atomix.cluster.ManagedClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.impl.ClusterHeartbeat;
import io.atomix.cluster.impl.PhiAccrualFailureDetector;
import io.atomix.cluster.impl.StatefulMember;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterMembershipService
extends AbstractListenerManager<ClusterMembershipEvent, ClusterMembershipEventListener>
implements ManagedClusterMembershipService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterMembershipService.class);
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
    private static final io.atomix.utils.serializer.Serializer SERIALIZER = io.atomix.utils.serializer.Serializer.using((Namespace)Namespace.builder().register(Namespaces.BASIC).nextId(500).register(new Class[]{MemberId.class}).register(new Class[]{MemberId.Type.class}).register(new Class[]{Member.State.class}).register(new Class[]{ClusterHeartbeat.class}).register(new Class[]{StatefulMember.class}).register((Serializer)new AddressSerializer(), new Class[]{Address.class}).build("ClusterMembershipService"));
    private final MessagingService messagingService;
    private final BroadcastService broadcastService;
    private final Collection<Member> bootstrapMembers;
    private final int heartbeatInterval;
    private final int phiFailureThreshold;
    private final int failureTimeout;
    private final AtomicBoolean started = new AtomicBoolean();
    private final StatefulMember localMember;
    private volatile Map<String, String> localMetadata;
    private final Map<MemberId, StatefulMember> members = Maps.newConcurrentMap();
    private final Map<MemberId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final Consumer<byte[]> broadcastListener = this::handleBroadcastMessage;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-sender", (Logger)LOGGER));
    private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)"atomix-cluster-heartbeat-receiver", (Logger)LOGGER));
    private ScheduledFuture<?> heartbeatFuture;

    public DefaultClusterMembershipService(Member localMember, Collection<Member> bootstrapMembers, MessagingService messagingService, BroadcastService broadcastService, GroupMembershipConfig config) {
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService, (Object)"messagingService cannot be null");
        this.broadcastService = (BroadcastService)Preconditions.checkNotNull((Object)broadcastService, (Object)"broadcastService cannot be null");
        this.localMember = new StatefulMember(localMember.id(), localMember.address(), localMember.zone(), localMember.rack(), localMember.host(), localMember.metadata());
        this.bootstrapMembers = bootstrapMembers;
        this.heartbeatInterval = config.getHeartbeatInterval();
        this.phiFailureThreshold = config.getPhiFailureThreshold();
        this.failureTimeout = config.getFailureTimeout();
    }

    @Override
    public Member getLocalMember() {
        return this.localMember;
    }

    @Override
    public Set<Member> getMembers() {
        return ImmutableSet.copyOf((Collection)this.members.values().stream().filter(member -> member.getState() == Member.State.ACTIVE).collect(Collectors.toList()));
    }

    @Override
    public Member getMember(MemberId memberId) {
        Member member = this.members.get(memberId);
        return member != null && member.getState() == Member.State.ACTIVE ? member : null;
    }

    private void broadcastIdentity() {
        this.broadcastService.broadcast(SERIALIZER.encode((Object)this.localMember));
    }

    private void handleBroadcastMessage(byte[] message) {
        StatefulMember member = (StatefulMember)SERIALIZER.decode(message);
        if (this.members.putIfAbsent(member.id(), member) == null) {
            this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
            this.sendHeartbeats();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> sendHeartbeats() {
        Member localMember = this.getLocalMember();
        if (!localMember.metadata().equals(this.localMetadata)) {
            DefaultClusterMembershipService defaultClusterMembershipService = this;
            synchronized (defaultClusterMembershipService) {
                if (!localMember.metadata().equals(this.localMetadata)) {
                    this.localMetadata = localMember.metadata();
                    this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_UPDATED, localMember));
                }
            }
        }
        Stream<StatefulMember> clusterMembers = this.members.values().stream().filter(member -> !member.id().equals(localMember.id()));
        Stream<StatefulMember> bootstrapMembers = this.bootstrapMembers.stream().filter(member -> !member.id().equals(localMember.id()) && !this.members.containsKey(member.id())).map(member -> new StatefulMember(member.id(), member.address(), member.zone(), member.rack(), member.host(), member.metadata()));
        byte[] payload = SERIALIZER.encode((Object)new ClusterHeartbeat(localMember.id(), localMember.zone(), localMember.rack(), localMember.host(), localMember.metadata()));
        return Futures.allOf(Stream.concat(clusterMembers, bootstrapMembers).map(member -> {
            LOGGER.trace("{} - Sending heartbeat: {}", (Object)localMember.id(), (Object)member.id());
            CompletableFuture<Void> future = this.sendHeartbeat(member.address(), payload);
            PhiAccrualFailureDetector failureDetector = this.failureDetectors.computeIfAbsent(member.id(), n -> new PhiAccrualFailureDetector());
            double phi = failureDetector.phi();
            if (phi >= (double)this.phiFailureThreshold || phi == 0.0 && failureDetector.lastUpdated() > 0L && System.currentTimeMillis() - failureDetector.lastUpdated() > (long)this.failureTimeout) {
                if (member.getState() == Member.State.ACTIVE) {
                    this.deactivateMember((Member)member);
                }
            } else if (member.getState() == Member.State.INACTIVE) {
                this.activateMember((StatefulMember)member);
            }
            return future.exceptionally(v -> null);
        }).collect(Collectors.toList())).thenApply(v -> null);
    }

    private CompletableFuture<Void> sendHeartbeat(Address address, byte[] payload) {
        return ((CompletableFuture)((CompletableFuture)this.messagingService.sendAndReceive(address, HEARTBEAT_MESSAGE, payload).whenComplete((response, error) -> {
            if (error == null) {
                Collection members = (Collection)SERIALIZER.decode(response);
                boolean sendHeartbeats = false;
                for (StatefulMember member : members) {
                    member.setState(Member.State.INACTIVE);
                    if (this.members.putIfAbsent(member.id(), member) != null) continue;
                    sendHeartbeats = true;
                }
                if (sendHeartbeats) {
                    this.sendHeartbeats();
                }
            } else {
                LOGGER.debug("{} - Sending heartbeat to {} failed", new Object[]{this.localMember.id(), address, error});
            }
        })).exceptionally(e -> null)).thenApply(v -> null);
    }

    private byte[] handleHeartbeat(Address address, byte[] message) {
        ClusterHeartbeat heartbeat = (ClusterHeartbeat)SERIALIZER.decode(message);
        LOGGER.trace("{} - Received heartbeat: {}", (Object)this.localMember.id(), (Object)heartbeat.memberId());
        this.failureDetectors.computeIfAbsent(heartbeat.memberId(), n -> new PhiAccrualFailureDetector()).report();
        this.activateMember(new StatefulMember(heartbeat.memberId(), address, heartbeat.zone(), heartbeat.rack(), heartbeat.host(), heartbeat.metadata()));
        return SERIALIZER.encode((Object)Lists.newArrayList(this.members.values()));
    }

    private synchronized void activateMember(StatefulMember member) {
        StatefulMember existingMember = this.members.get(member.id());
        if (existingMember == null) {
            member.setState(Member.State.ACTIVE);
            LOGGER.info("{} - Member activated: {}", (Object)this.localMember.id(), (Object)member);
            this.members.put(member.id(), member);
            this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
            this.sendHeartbeat(member.address(), SERIALIZER.encode((Object)new ClusterHeartbeat(this.localMember.id(), this.localMember.zone(), this.localMember.rack(), this.localMember.host(), this.localMember.metadata())));
        } else if (existingMember.getState() == Member.State.INACTIVE) {
            LOGGER.info("{} - Member activated: {}", (Object)this.localMember.id(), (Object)existingMember);
            existingMember.setState(Member.State.ACTIVE);
            this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, existingMember));
        } else if (!existingMember.metadata().equals(member.metadata())) {
            member.setState(Member.State.ACTIVE);
            LOGGER.info("{} - Member updated: {}", (Object)this.localMember.id(), (Object)member);
            this.members.put(member.id(), member);
            this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_UPDATED, member));
        }
    }

    private synchronized void deactivateMember(Member member) {
        StatefulMember existingMember = this.members.get(member.id());
        if (existingMember != null && existingMember.getState() == Member.State.ACTIVE) {
            LOGGER.info("{} - Member deactivated: {}", (Object)this.localMember.id(), (Object)existingMember);
            existingMember.setState(Member.State.INACTIVE);
            this.post((Event)new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_REMOVED, existingMember));
        }
    }

    public CompletableFuture<ClusterMembershipService> start() {
        if (this.started.compareAndSet(false, true)) {
            this.broadcastService.addListener(this.broadcastListener);
            LOGGER.info("{} - Member activated: {}", (Object)this.localMember.id(), (Object)this.localMember);
            this.localMember.setState(Member.State.ACTIVE);
            this.members.put(this.localMember.id(), this.localMember);
            this.messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, (Executor)this.heartbeatExecutor);
            ComposableFuture future = new ComposableFuture();
            this.broadcastIdentity();
            this.sendHeartbeats().whenComplete((r, e) -> future.complete(null));
            this.heartbeatFuture = this.heartbeatScheduler.scheduleWithFixedDelay(() -> {
                this.broadcastIdentity();
                this.sendHeartbeats();
            }, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            return future.thenApply(v -> {
                LOGGER.info("Started");
                return this;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatScheduler.shutdownNow();
            LOGGER.info("{} - Member deactivated: {}", (Object)this.localMember.id(), (Object)this.localMember);
            this.localMember.setState(Member.State.INACTIVE);
            this.members.clear();
            this.heartbeatFuture.cancel(true);
            this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
            this.heartbeatExecutor.shutdownNow();
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }

    private static class AddressSerializer
    extends Serializer<Address> {
        private AddressSerializer() {
        }

        public void write(Kryo kryo, Output output, Address address) {
            output.writeString(address.address().getHostAddress());
            output.writeInt(address.port());
        }

        public Address read(Kryo kryo, Input input, Class<Address> type) {
            String host = input.readString();
            int port = input.readInt();
            return Address.from((String)host, (int)port);
        }
    }
}

