/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCacheSelector;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderElectionDriver
implements LeaderElectionDriver,
LeaderLatchListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
    private final CuratorFramework curatorFramework;
    private final LeaderElectionDriver.Listener leaderElectionListener;
    private final String leaderLatchPath;
    private final LeaderLatch leaderLatch;
    private final TreeCache treeCache;
    private final ConnectionStateListener listener = (client, newState) -> this.handleStateChange(newState);
    private AtomicBoolean running = new AtomicBoolean(true);

    public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener) throws Exception {
        this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
        this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
        this.leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
        this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
        this.treeCache = ZooKeeperUtils.createTreeCache(curatorFramework, "/", new ConnectionInfoNodeSelector());
        this.treeCache.getListenable().addListener((client, event) -> {
            switch (event.getType()) {
                case NODE_ADDED: 
                case NODE_UPDATED: {
                    Preconditions.checkNotNull(event.getData(), "The ZooKeeper event data must not be null.");
                    this.handleChangedLeaderInformation(event.getData());
                    break;
                }
                case NODE_REMOVED: {
                    Preconditions.checkNotNull(event.getData(), "The ZooKeeper event data must not be null.");
                    this.handleRemovedLeaderInformation(event.getData().getPath());
                }
            }
        });
        this.leaderLatch.addListener((LeaderLatchListener)this);
        curatorFramework.getConnectionStateListenable().addListener((Object)this.listener);
        this.leaderLatch.start();
        this.treeCache.start();
    }

    @Override
    public void close() throws Exception {
        if (this.running.compareAndSet(true, false)) {
            LOG.info("Closing {}.", (Object)this);
            this.curatorFramework.getConnectionStateListenable().removeListener((Object)this.listener);
            Exception exception = null;
            try {
                this.treeCache.close();
            }
            catch (Exception e) {
                exception = e;
            }
            try {
                this.leaderLatch.close();
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
            ExceptionUtils.tryRethrowException(exception);
        }
    }

    @Override
    public boolean hasLeadership() {
        return this.leaderLatch.hasLeadership();
    }

    @Override
    public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
        Preconditions.checkState(this.running.get());
        if (!this.leaderLatch.hasLeadership()) {
            return;
        }
        String connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(componentId);
        LOG.debug("Write leader information {} for component '{}' to {}.", new Object[]{leaderInformation, componentId, ZooKeeperUtils.generateZookeeperPath(this.curatorFramework.getNamespace(), connectionInformationPath)});
        try {
            ZooKeeperUtils.writeLeaderInformationToZooKeeper(leaderInformation, this.curatorFramework, () -> ((LeaderLatch)this.leaderLatch).hasLeadership(), connectionInformationPath);
        }
        catch (Exception e) {
            this.leaderElectionListener.onError(e);
        }
    }

    @Override
    public void deleteLeaderInformation(String componentId) {
        try {
            ZooKeeperUtils.deleteZNode(this.curatorFramework, ZooKeeperUtils.generateZookeeperPath(componentId));
        }
        catch (Exception e) {
            this.leaderElectionListener.onError(e);
        }
    }

    private void handleStateChange(ConnectionState newState) {
        switch (newState) {
            case CONNECTED: {
                LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
                break;
            }
            case SUSPENDED: {
                LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
                break;
            }
            case RECONNECTED: {
                LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
                break;
            }
            case LOST: {
                LOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");
            }
        }
    }

    public void isLeader() {
        UUID leaderSessionID = UUID.randomUUID();
        LOG.debug("{} obtained the leadership with session ID {}.", (Object)this, (Object)leaderSessionID);
        this.leaderElectionListener.onGrantLeadership(leaderSessionID);
    }

    public void notLeader() {
        LOG.debug("{} lost the leadership.", (Object)this);
        this.leaderElectionListener.onRevokeLeadership();
    }

    private void handleChangedLeaderInformation(ChildData childData) {
        if (this.shouldHandleLeaderInformationEvent(childData.getPath())) {
            String componentId = this.extractComponentId(childData.getPath());
            LeaderInformation leaderInformation = this.tryReadingLeaderInformation(childData, componentId);
            this.leaderElectionListener.onLeaderInformationChange(componentId, leaderInformation);
        }
    }

    private String extractComponentId(String path) {
        String[] splits = ZooKeeperUtils.splitZooKeeperPath(path);
        Preconditions.checkState(splits.length >= 2, String.format("Expecting path consisting of /<component-id>/connection_info. Got path '%s'", path));
        return splits[splits.length - 2];
    }

    private void handleRemovedLeaderInformation(String removedNodePath) {
        if (this.shouldHandleLeaderInformationEvent(removedNodePath)) {
            String leaderName = this.extractComponentId(removedNodePath);
            this.leaderElectionListener.onLeaderInformationChange(leaderName, LeaderInformation.empty());
        }
    }

    private boolean shouldHandleLeaderInformationEvent(String path) {
        return this.running.get() && this.leaderLatch.hasLeadership() && ZooKeeperUtils.isConnectionInfoPath(path);
    }

    private LeaderInformation tryReadingLeaderInformation(ChildData childData, String id) {
        LeaderInformation leaderInformation;
        try {
            leaderInformation = ZooKeeperUtils.readLeaderInformation(childData.getData());
            LOG.debug("Leader information for {} has changed to {}.", (Object)id, (Object)leaderInformation);
        }
        catch (IOException | ClassNotFoundException e) {
            LOG.debug("Could not read leader information for {}. Rewriting the information.", (Object)id, (Object)e);
            leaderInformation = LeaderInformation.empty();
        }
        return leaderInformation;
    }

    public String toString() {
        return String.format("%s{leaderLatchPath='%s'}", this.getClass().getSimpleName(), this.leaderLatchPath);
    }

    private static class ConnectionInfoNodeSelector
    implements TreeCacheSelector {
        private ConnectionInfoNodeSelector() {
        }

        public boolean traverseChildren(String fullPath) {
            return true;
        }

        public boolean acceptChild(String fullPath) {
            return !fullPath.endsWith(ZooKeeperUtils.getLeaderLatchPath());
        }
    }
}

