/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.client.impl;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.BrokerTopologyListener;
import io.camunda.zeebe.broker.client.api.BrokerTopologyManager;
import io.camunda.zeebe.broker.client.impl.BrokerClientTopologyMetrics;
import io.camunda.zeebe.broker.client.impl.BrokerClusterStateImpl;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.protocol.record.PartitionHealthStatus;
import io.camunda.zeebe.scheduler.Actor;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BrokerTopologyManagerImpl
extends Actor
implements BrokerTopologyManager,
ClusterMembershipEventListener,
ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerTopologyManagerImpl.class);
    private volatile BrokerClusterStateImpl topology = new BrokerClusterStateImpl();
    private volatile ClusterConfiguration clusterConfiguration = ClusterConfiguration.uninitialized();
    private final Supplier<Set<Member>> membersSupplier;
    private final BrokerClientTopologyMetrics topologyMetrics = new BrokerClientTopologyMetrics();
    private final Set<BrokerTopologyListener> topologyListeners = new HashSet<BrokerTopologyListener>();

    public BrokerTopologyManagerImpl(Supplier<Set<Member>> membersSupplier) {
        this.membersSupplier = membersSupplier;
    }

    @Override
    public BrokerClusterState getTopology() {
        return this.topology;
    }

    @Override
    public ClusterConfiguration getClusterConfiguration() {
        return this.clusterConfiguration;
    }

    @Override
    public void addTopologyListener(BrokerTopologyListener listener) {
        this.actor.run(() -> {
            this.topologyListeners.add(listener);
            this.topology.getBrokers().stream().map(b -> MemberId.from((String)String.valueOf(b))).forEach(listener::brokerAdded);
        });
    }

    @Override
    public void removeTopologyListener(BrokerTopologyListener listener) {
        this.actor.run(() -> this.topologyListeners.remove(listener));
    }

    private void updateTopology(Consumer<BrokerClusterStateImpl> updater) {
        this.actor.run(() -> {
            BrokerClusterStateImpl updated = new BrokerClusterStateImpl(this.topology);
            updater.accept(updated);
            this.topology = updated;
            this.updateMetrics(updated);
        });
    }

    private void checkForMissingEvents() {
        Set<Member> members = this.membersSupplier.get();
        if (members == null || members.isEmpty()) {
            return;
        }
        this.updateTopology(topology -> {
            for (Member member : members) {
                BrokerInfo brokerInfo = BrokerInfo.fromProperties((Properties)member.properties());
                if (brokerInfo == null) continue;
                this.addBroker((BrokerClusterStateImpl)topology, member, brokerInfo);
            }
        });
    }

    private void addBroker(BrokerClusterStateImpl topology, Member member, BrokerInfo brokerInfo) {
        if (topology.addBrokerIfAbsent(brokerInfo.getNodeId())) {
            this.topologyListeners.forEach(l -> l.brokerAdded(member.id()));
        }
        this.processProperties(topology, brokerInfo);
    }

    private void removeBroker(BrokerClusterStateImpl topology, Member member, BrokerInfo brokerInfo) {
        topology.removeBroker(brokerInfo.getNodeId());
        this.topologyListeners.forEach(l -> l.brokerRemoved(member.id()));
    }

    public String getName() {
        return "GatewayTopologyManager";
    }

    protected void onActorStarted() {
        this.checkForMissingEvents();
    }

    public void event(ClusterMembershipEvent event) {
        Member subject = (Member)event.subject();
        ClusterMembershipEvent.Type eventType = (ClusterMembershipEvent.Type)event.type();
        BrokerInfo brokerInfo = BrokerInfo.fromProperties((Properties)subject.properties());
        if (brokerInfo == null) {
            return;
        }
        switch (eventType) {
            case MEMBER_ADDED: {
                LOG.debug("Received new broker {}.", (Object)brokerInfo);
                this.updateTopology(topology -> this.addBroker((BrokerClusterStateImpl)topology, subject, brokerInfo));
                break;
            }
            case METADATA_CHANGED: {
                LOG.debug("Received metadata change from Broker {}, partitions {}, terms {} and health {}.", new Object[]{brokerInfo.getNodeId(), brokerInfo.getPartitionRoles(), brokerInfo.getPartitionLeaderTerms(), brokerInfo.getPartitionHealthStatuses()});
                this.updateTopology(topology -> this.addBroker((BrokerClusterStateImpl)topology, subject, brokerInfo));
                break;
            }
            case MEMBER_REMOVED: {
                LOG.debug("Received broker was removed {}.", (Object)brokerInfo);
                this.updateTopology(topology -> this.removeBroker((BrokerClusterStateImpl)topology, subject, brokerInfo));
                break;
            }
            default: {
                LOG.debug("Received {} for broker {}, do nothing.", (Object)eventType, (Object)brokerInfo.getNodeId());
            }
        }
    }

    private void processProperties(BrokerClusterStateImpl topology, BrokerInfo distributedBrokerInfo) {
        if (!topology.isInitialized()) {
            topology.setClusterSize(distributedBrokerInfo.getClusterSize());
            topology.setPartitionsCount(distributedBrokerInfo.getPartitionsCount());
            topology.setReplicationFactor(distributedBrokerInfo.getReplicationFactor());
        }
        int nodeId = distributedBrokerInfo.getNodeId();
        topology.syncPartitions(nodeId, distributedBrokerInfo.getPartitionRoles().keySet());
        distributedBrokerInfo.consumePartitions(topology::addPartitionIfAbsent, (leaderPartitionId, term) -> topology.setPartitionLeader((int)leaderPartitionId, nodeId, term), followerPartitionId -> topology.addPartitionFollower(followerPartitionId, nodeId), inactivePartitionId -> topology.addPartitionInactive(inactivePartitionId, nodeId));
        distributedBrokerInfo.consumePartitionsHealth((partition, health) -> topology.setPartitionHealthStatus(nodeId, (int)partition, (PartitionHealthStatus)health));
        String clientAddress = distributedBrokerInfo.getCommandApiAddress();
        if (clientAddress != null) {
            topology.setBrokerAddressIfPresent(nodeId, clientAddress);
        }
        topology.setBrokerVersionIfPresent(nodeId, distributedBrokerInfo.getVersion());
    }

    private void updateMetrics(BrokerClusterStateImpl topology) {
        List<Integer> partitions = topology.getPartitions();
        partitions.forEach(partition -> {
            Set<Integer> followers;
            int leader = topology.getLeaderForPartition((int)partition);
            if (leader != -2) {
                this.topologyMetrics.setLeaderForPartition((int)partition, leader);
            }
            if ((followers = topology.getFollowersForPartition((int)partition)) != null) {
                followers.forEach(broker -> this.topologyMetrics.setFollower((int)partition, (int)broker));
            }
        });
    }

    public void onClusterConfigurationUpdated(ClusterConfiguration clusterTopology) {
        if (clusterTopology.isUninitialized()) {
            return;
        }
        this.clusterConfiguration = clusterTopology;
        this.updateTopology(topologyToUpdate -> {
            int newClusterSize = clusterTopology.clusterSize();
            int newPartitionsCount = clusterTopology.partitionCount();
            Integer newReplicationFactor = clusterTopology.minReplicationFactor();
            if (newClusterSize != topologyToUpdate.getClusterSize() || newPartitionsCount != topologyToUpdate.getPartitionsCount() || newReplicationFactor.intValue() != topologyToUpdate.getReplicationFactor()) {
                LOG.debug("Updating topology with clusterSize {}, partitionsCount {} and replicationFactor {}", new Object[]{newClusterSize, newPartitionsCount, newReplicationFactor});
                topologyToUpdate.setClusterSize(newClusterSize);
                topologyToUpdate.setPartitionsCount(newPartitionsCount);
                topologyToUpdate.setReplicationFactor(newReplicationFactor);
            }
        });
    }
}

