/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.primitive.partition.impl;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.primitive.partition.ManagedPartitionGroup;
import io.atomix.primitive.partition.ManagedPartitionGroupMembershipService;
import io.atomix.primitive.partition.MemberGroupStrategy;
import io.atomix.primitive.partition.PartitionGroup;
import io.atomix.primitive.partition.PartitionGroupConfig;
import io.atomix.primitive.partition.PartitionGroupMembership;
import io.atomix.primitive.partition.PartitionGroupMembershipEvent;
import io.atomix.primitive.partition.PartitionGroupMembershipEventListener;
import io.atomix.primitive.partition.PartitionGroupMembershipService;
import io.atomix.primitive.partition.PartitionGroupTypeRegistry;
import io.atomix.utils.Type;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.config.ConfigurationException;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.event.Event;
import io.atomix.utils.event.EventListener;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPartitionGroupMembershipService
extends AbstractListenerManager<PartitionGroupMembershipEvent, PartitionGroupMembershipEventListener>
implements ManagedPartitionGroupMembershipService {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPartitionGroupMembershipService.class);
    private static final String BOOTSTRAP_SUBJECT = "partition-group-bootstrap";
    private static final int[] FIBONACCI_NUMBERS = new int[]{1, 1, 2, 3, 5};
    private final ClusterMembershipService membershipService;
    private final ClusterCommunicationService messagingService;
    private final PartitionGroupTypeRegistry groupTypeRegistry;
    private final Serializer serializer;
    private volatile PartitionGroupMembership systemGroup;
    private final Map<String, PartitionGroupMembership> groups = Maps.newConcurrentMap();
    private final ClusterMembershipEventListener membershipEventListener = this::handleMembershipChange;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile ThreadContext threadContext;

    public DefaultPartitionGroupMembershipService(ClusterMembershipService membershipService, ClusterCommunicationService messagingService, ManagedPartitionGroup systemGroup, Collection<ManagedPartitionGroup> groups, PartitionGroupTypeRegistry groupTypeRegistry) {
        this.membershipService = membershipService;
        this.messagingService = messagingService;
        this.groupTypeRegistry = groupTypeRegistry;
        this.systemGroup = systemGroup != null ? new PartitionGroupMembership(systemGroup.name(), (PartitionGroupConfig)systemGroup.config(), (Set<MemberId>)ImmutableSet.of((Object)membershipService.getLocalMember().id()), true) : null;
        groups.forEach(group -> this.groups.put(group.name(), new PartitionGroupMembership(group.name(), (PartitionGroupConfig)group.config(), (Set<MemberId>)ImmutableSet.of((Object)membershipService.getLocalMember().id()), false)));
        Namespace.Builder namespaceBuilder = Namespace.builder().register(Namespaces.BASIC).register(new Class[]{MemberId.class}).register(new Class[]{PartitionGroupMembership.class}).register(new Class[]{PartitionGroupInfo.class}).register(new Class[]{PartitionGroupConfig.class}).register(new Class[]{MemberGroupStrategy.class});
        ArrayList groupTypes = Lists.newArrayList(groupTypeRegistry.getGroupTypes());
        groupTypes.sort(Comparator.comparing(Type::name));
        for (PartitionGroup.Type groupType : groupTypes) {
            namespaceBuilder.register(new Class[]{groupType.getClass()});
            namespaceBuilder.register(new Class[]{groupType.newConfig().getClass()});
        }
        this.serializer = Serializer.using((Namespace)namespaceBuilder.build());
    }

    @Override
    public PartitionGroupMembership getSystemMembership() {
        return this.systemGroup;
    }

    @Override
    public PartitionGroupMembership getMembership(String group) {
        PartitionGroupMembership membership = this.groups.get(group);
        if (membership != null) {
            return membership;
        }
        return this.systemGroup.group().equals(group) ? this.systemGroup : null;
    }

    @Override
    public Collection<PartitionGroupMembership> getMemberships() {
        return this.groups.values();
    }

    private void handleMembershipChange(ClusterMembershipEvent event) {
        if (event.type() == ClusterMembershipEvent.Type.MEMBER_ADDED) {
            this.bootstrap((Member)event.subject());
        } else if (event.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED) {
            this.threadContext.execute(() -> {
                PartitionGroupMembership systemGroup = this.systemGroup;
                if (systemGroup != null && systemGroup.members().contains(((Member)event.subject()).id())) {
                    PartitionGroupMembership newMembership;
                    HashSet newMembers = Sets.newHashSet(systemGroup.members());
                    newMembers.remove(((Member)event.subject()).id());
                    this.systemGroup = newMembership = new PartitionGroupMembership(systemGroup.group(), systemGroup.config(), (Set<MemberId>)ImmutableSet.copyOf((Collection)newMembers), true);
                    this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, newMembership));
                }
                this.groups.values().forEach(group -> {
                    if (group.members().contains(((Member)event.subject()).id())) {
                        HashSet newMembers = Sets.newHashSet(group.members());
                        newMembers.remove(((Member)event.subject()).id());
                        PartitionGroupMembership newMembership = new PartitionGroupMembership(group.group(), group.config(), (Set<MemberId>)ImmutableSet.copyOf((Collection)newMembers), false);
                        this.groups.put(group.group(), newMembership);
                        this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, newMembership));
                    }
                });
            });
        }
    }

    private CompletableFuture<Void> bootstrap() {
        return this.bootstrap(0, new CompletableFuture<Void>());
    }

    private CompletableFuture<Void> bootstrap(int attempt, CompletableFuture<Void> future) {
        Futures.allOf(this.membershipService.getMembers().stream().filter(node -> !node.id().equals((Object)this.membershipService.getLocalMember().id())).map(node -> this.bootstrap((Member)node)).collect(Collectors.toList())).whenComplete((result, error) -> {
            if (error == null) {
                if (this.systemGroup == null) {
                    LOGGER.warn("Failed to locate system partition group. Retrying...");
                    this.threadContext.schedule(Duration.ofSeconds(FIBONACCI_NUMBERS[Math.min(attempt, 4)]), () -> this.bootstrap(attempt + 1, future));
                } else if (this.groups.isEmpty()) {
                    LOGGER.warn("Failed to locate primitive partition group(s). Retrying...");
                    this.threadContext.schedule(Duration.ofSeconds(FIBONACCI_NUMBERS[Math.min(attempt, 4)]), () -> this.bootstrap(attempt + 1, future));
                } else {
                    future.complete(null);
                }
            } else {
                future.completeExceptionally((Throwable)error);
            }
        });
        return future;
    }

    private CompletableFuture<Void> bootstrap(Member member) {
        return this.bootstrap(member, new CompletableFuture<Void>());
    }

    private CompletableFuture<Void> bootstrap(Member member, CompletableFuture<Void> future) {
        LOGGER.debug("{} - Bootstrapping from member {}", (Object)this.membershipService.getLocalMember().id(), (Object)member);
        this.messagingService.send(BOOTSTRAP_SUBJECT, (Object)new PartitionGroupInfo(this.membershipService.getLocalMember().id(), this.systemGroup, Lists.newArrayList(this.groups.values())), arg_0 -> ((Serializer)this.serializer).encode(arg_0), arg_0 -> ((Serializer)this.serializer).decode(arg_0), member.id()).whenCompleteAsync((info, error) -> {
            if (error == null) {
                try {
                    this.updatePartitionGroups((PartitionGroupInfo)info);
                    future.complete(null);
                }
                catch (Exception e) {
                    future.completeExceptionally(e);
                }
            } else if ((error = Throwables.getRootCause((Throwable)error)) instanceof MessagingException.NoRemoteHandler || error instanceof TimeoutException) {
                this.threadContext.schedule(Duration.ofSeconds(1L), () -> this.bootstrap(member, future));
            } else {
                LOGGER.debug("{} - Failed to bootstrap from member {}", new Object[]{this.membershipService.getLocalMember().id(), member, error});
                future.complete(null);
            }
        }, (Executor)this.threadContext);
        return future;
    }

    private void updatePartitionGroups(PartitionGroupInfo info) {
        if (this.systemGroup == null && info.systemGroup != null) {
            this.systemGroup = info.systemGroup;
            this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, this.systemGroup));
            LOGGER.debug("{} - Bootstrapped system group {} from {}", new Object[]{this.membershipService.getLocalMember().id(), this.systemGroup, info.memberId});
        } else if (this.systemGroup != null && info.systemGroup != null) {
            if (!this.systemGroup.group().equals(info.systemGroup.group()) || !((PartitionGroup.Type)this.systemGroup.config().getType()).name().equals(((PartitionGroup.Type)info.systemGroup.config().getType()).name())) {
                throw new ConfigurationException("Duplicate system group detected");
            }
            Set newMembers = Stream.concat(this.systemGroup.members().stream(), info.systemGroup.members().stream()).filter(memberId -> this.membershipService.getMember(memberId) != null).collect(Collectors.toSet());
            if (!Sets.difference(newMembers, this.systemGroup.members()).isEmpty()) {
                this.systemGroup = new PartitionGroupMembership(this.systemGroup.group(), this.systemGroup.config(), (Set<MemberId>)ImmutableSet.copyOf(newMembers), true);
                this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, this.systemGroup));
                LOGGER.debug("{} - Updated system group {} from {}", new Object[]{this.membershipService.getLocalMember().id(), this.systemGroup, info.memberId});
            }
        }
        for (PartitionGroupMembership newMembership : info.groups) {
            PartitionGroupMembership oldMembership = this.groups.get(newMembership.group());
            if (oldMembership == null) {
                this.groups.put(newMembership.group(), newMembership);
                this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, newMembership));
                LOGGER.debug("{} - Bootstrapped partition group {} from {}", new Object[]{this.membershipService.getLocalMember().id(), newMembership, info.memberId});
                continue;
            }
            if (!oldMembership.group().equals(newMembership.group()) || !((PartitionGroup.Type)oldMembership.config().getType()).name().equals(((PartitionGroup.Type)newMembership.config().getType()).name())) {
                throw new ConfigurationException("Duplicate partition group " + newMembership.group() + " detected");
            }
            Set newMembers = Stream.concat(oldMembership.members().stream(), newMembership.members().stream()).filter(memberId -> this.membershipService.getMember(memberId) != null).collect(Collectors.toSet());
            if (Sets.difference(newMembers, oldMembership.members()).isEmpty()) continue;
            PartitionGroupMembership newGroup = new PartitionGroupMembership(oldMembership.group(), oldMembership.config(), (Set<MemberId>)ImmutableSet.copyOf(newMembers), false);
            this.groups.put(oldMembership.group(), newGroup);
            this.post((Event)new PartitionGroupMembershipEvent(PartitionGroupMembershipEvent.Type.MEMBERS_CHANGED, newGroup));
            LOGGER.debug("{} - Updated partition group {} from {}", new Object[]{this.membershipService.getLocalMember().id(), newGroup, info.memberId});
        }
    }

    private PartitionGroupInfo handleBootstrap(PartitionGroupInfo info) {
        try {
            this.updatePartitionGroups(info);
        }
        catch (Exception e) {
            LOGGER.warn("{}", (Object)e.getMessage());
        }
        return new PartitionGroupInfo(this.membershipService.getLocalMember().id(), this.systemGroup, Lists.newArrayList(this.groups.values()));
    }

    public CompletableFuture<PartitionGroupMembershipService> start() {
        this.threadContext = new SingleThreadContext(Threads.namedThreads((String)"atomix-partition-group-membership-service-%d", (Logger)LOGGER));
        this.membershipService.addListener((EventListener)this.membershipEventListener);
        this.messagingService.subscribe(BOOTSTRAP_SUBJECT, arg_0 -> ((Serializer)this.serializer).decode(arg_0), this::handleBootstrap, arg_0 -> ((Serializer)this.serializer).encode(arg_0), (Executor)this.threadContext);
        return this.bootstrap().thenApply(v -> {
            LOGGER.info("Started");
            this.started.set(true);
            return this;
        });
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        this.membershipService.removeListener((EventListener)this.membershipEventListener);
        this.messagingService.unsubscribe(BOOTSTRAP_SUBJECT);
        ThreadContext threadContext = this.threadContext;
        if (threadContext != null) {
            threadContext.close();
        }
        LOGGER.info("Stopped");
        this.started.set(false);
        return CompletableFuture.completedFuture(null);
    }

    private static class PartitionGroupInfo {
        private final MemberId memberId;
        private final PartitionGroupMembership systemGroup;
        private final Collection<PartitionGroupMembership> groups;

        PartitionGroupInfo(MemberId memberId, PartitionGroupMembership systemGroup, Collection<PartitionGroupMembership> groups) {
            this.memberId = memberId;
            this.systemGroup = systemGroup;
            this.groups = groups;
        }
    }
}

