/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.clustercontroller.core.database;

import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FleetControllerContext;
import com.yahoo.vespa.clustercontroller.core.database.CasWriteFailed;
import com.yahoo.vespa.clustercontroller.core.database.Database;
import com.yahoo.vespa.clustercontroller.core.database.MasterDataGatherer;
import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperPaths;
import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

public class ZooKeeperDatabase
extends Database {
    private static final Logger log = Logger.getLogger(ZooKeeperDatabase.class.getName());
    private static final Charset utf8 = StandardCharsets.UTF_8;
    private static final List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private final ZooKeeperPaths paths;
    private final Database.DatabaseListener listener;
    private final ZooKeeperWatcher watcher = new ZooKeeperWatcher();
    private final ZooKeeper session;
    private boolean sessionOpen = true;
    private final FleetControllerContext context;
    private final MasterDataGatherer masterDataGatherer;
    private int lastKnownStateBundleZNodeVersion = -2;
    private int lastKnownStateVersionZNodeVersion = -2;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ZooKeeperDatabase(FleetControllerContext context, String address, int timeout, Database.DatabaseListener zksl) throws IOException, KeeperException, InterruptedException {
        this.context = context;
        this.paths = new ZooKeeperPaths(context.id());
        this.session = new ZooKeeper(address, timeout, (Watcher)this.watcher, new ZkClientConfigBuilder().toConfig());
        boolean completedOk = false;
        try {
            this.listener = zksl;
            this.setupRoot();
            context.log(log, Level.FINEST, "Asking for initial data on master election");
            this.masterDataGatherer = new MasterDataGatherer(this.session, this.paths, this.listener, context.id().index());
            completedOk = true;
        }
        finally {
            if (!completedOk) {
                this.session.close();
            }
        }
    }

    private void createNode(String path, byte[] value) throws KeeperException, InterruptedException {
        try {
            if (this.session.exists(path, false) != null) {
                this.context.log(log, Level.FINE, () -> "Zookeeper node '" + path + "' already exists. Not creating it");
                return;
            }
            this.session.create(path, value, acl, CreateMode.PERSISTENT);
            this.context.log(log, Level.FINE, () -> "Created zookeeper node '" + path + "'");
        }
        catch (KeeperException.NodeExistsException e) {
            this.context.log(log, Level.FINE, "Node to create existed, but this is normal as other nodes may create them at the same time.");
        }
    }

    private void setupRoot() throws KeeperException, InterruptedException {
        String[] pathElements = this.paths.root().substring(1).split("/");
        Object path = "";
        for (String elem : pathElements) {
            path = (String)path + "/" + elem;
            this.createNode((String)path, new byte[0]);
        }
        this.createNode(this.paths.indexesRoot(), new byte[0]);
        this.createNode(this.paths.wantedStates(), new byte[0]);
        this.createNode(this.paths.startTimestamps(), new byte[0]);
        this.createNode(this.paths.latestVersion(), Integer.valueOf(0).toString().getBytes(utf8));
        this.createNode(this.paths.publishedStateBundle(), new byte[0]);
        byte[] val = String.valueOf(this.context.id().index()).getBytes(utf8);
        this.deleteNodeIfExists(this.paths.indexOfMe());
        this.context.log(log, Level.INFO, "Creating ephemeral master vote node with vote to self.");
        this.session.create(this.paths.indexOfMe(), val, acl, CreateMode.EPHEMERAL);
    }

    private void deleteNodeIfExists(String path) throws KeeperException, InterruptedException {
        if (this.session.exists(path, false) != null) {
            this.context.log(log, Level.INFO, "Removing master vote node at " + path);
            this.session.delete(path, -1);
        }
    }

    @Override
    public void close() {
        this.sessionOpen = false;
        try {
            this.context.log(log, Level.FINE, () -> "Trying to close ZooKeeper session 0x" + Long.toHexString(this.session.getSessionId()));
            this.session.close();
        }
        catch (InterruptedException e) {
            this.context.log(log, Level.WARNING, "Got interrupt exception while closing session: " + e);
        }
    }

    @Override
    public boolean isClosed() {
        return !this.sessionOpen || this.watcher.getState().equals((Object)Watcher.Event.KeeperState.Expired);
    }

    private void maybeLogExceptionWarning(Exception e, String message) {
        if (this.sessionOpen) {
            StringWriter sw = new StringWriter();
            e.printStackTrace(new PrintWriter(sw));
            this.context.log(log, Level.WARNING, message + ". Exception: " + e.getMessage() + "\n" + sw);
        }
    }

    @Override
    public boolean storeMasterVote(int wantedMasterIndex) {
        byte[] val = String.valueOf(wantedMasterIndex).getBytes(utf8);
        try {
            this.session.setData(this.paths.indexOfMe(), val, -1);
            this.context.log(log, Level.INFO, "Stored new vote in ephemeral node. " + this.context.id().index() + " -> " + wantedMasterIndex);
            return true;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to create our ephemeral node and store master vote");
            return false;
        }
    }

    @Override
    public boolean storeLatestSystemStateVersion(int version) {
        byte[] data = Integer.toString(version).getBytes(utf8);
        try {
            this.context.log(log, Level.INFO, "Storing new cluster state version in ZooKeeper: " + version);
            Stat stat = this.session.setData(this.paths.latestVersion(), data, this.lastKnownStateVersionZNodeVersion);
            this.lastKnownStateVersionZNodeVersion = stat.getVersion();
            return true;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (KeeperException.BadVersionException e) {
            throw new CasWriteFailed(String.format("version mismatch in cluster state version znode (expected %d): %s", this.lastKnownStateVersionZNodeVersion, e.getMessage()), e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version);
            return false;
        }
    }

    @Override
    public Integer retrieveLatestSystemStateVersion() {
        byte[] data;
        Stat stat = new Stat();
        this.context.log(log, Level.FINE, "Fetching latest cluster state at '%s'", this.paths.latestVersion(), new Object[0]);
        try {
            data = this.session.getData(this.paths.latestVersion(), false, stat);
        }
        catch (KeeperException.NoNodeException e) {
            this.lastKnownStateVersionZNodeVersion = 0;
            this.maybeLogExceptionWarning((Exception)((Object)e), "No latest system state found");
            return null;
        }
        catch (InterruptedException | KeeperException e) {
            throw new RuntimeException("Failed to get " + this.paths.latestVersion(), e);
        }
        this.lastKnownStateVersionZNodeVersion = stat.getVersion();
        Integer versionNumber = Integer.valueOf(new String(data, utf8));
        this.context.log(log, Level.INFO, "Read cluster state version %d from ZooKeeper (znode version %d)", versionNumber, stat.getVersion());
        return versionNumber;
    }

    @Override
    public boolean storeWantedStates(Map<Node, NodeState> states) {
        if (states == null) {
            states = new TreeMap<Node, NodeState>();
        }
        StringBuilder sb = new StringBuilder();
        for (Node node : states.keySet()) {
            NodeState nodeState = states.get(node);
            if (nodeState.equals((Object)new NodeState(node.getType(), State.UP))) continue;
            NodeState toStore = new NodeState(node.getType(), nodeState.getState());
            toStore.setDescription(nodeState.getDescription());
            if (!toStore.equals((Object)nodeState)) {
                log.warning("Attempted to store wanted state with more than just a main state. Extra data stripped. Original data '" + nodeState.serialize(true));
            }
            sb.append(node).append(':').append(toStore.serialize(true)).append('\n');
        }
        byte[] val = sb.toString().getBytes(utf8);
        try {
            this.context.log(log, Level.FINE, () -> "Storing wanted states at '" + this.paths.wantedStates() + "'");
            this.session.setData(this.paths.wantedStates(), val, -1);
            return true;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to store wanted states in ZooKeeper");
            return false;
        }
    }

    @Override
    public Map<Node, NodeState> retrieveWantedStates() {
        try {
            this.context.log(log, Level.FINE, () -> "Fetching wanted states at '" + this.paths.wantedStates() + "'");
            Stat stat = new Stat();
            byte[] data = this.session.getData(this.paths.wantedStates(), false, stat);
            TreeMap<Node, NodeState> wanted = new TreeMap<Node, NodeState>();
            if (data != null && data.length > 0) {
                StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
                while (st.hasMoreTokens()) {
                    String token = st.nextToken();
                    int colon = token.indexOf(58);
                    try {
                        if (colon < 0) {
                            throw new Exception();
                        }
                        Node node = new Node(token.substring(0, colon));
                        NodeState nodeState = NodeState.deserialize((NodeType)node.getType(), (String)token.substring(colon + 1));
                        wanted.put(node, nodeState);
                    }
                    catch (Exception e) {
                        this.context.log(log, Level.WARNING, "Ignoring invalid wantedstate line in zookeeper '" + token + "'.");
                    }
                }
            }
            return wanted;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to retrieve wanted states from ZooKeeper");
            return null;
        }
    }

    @Override
    public boolean storeStartTimestamps(Map<Node, Long> timestamps) {
        if (timestamps == null) {
            timestamps = new TreeMap<Node, Long>();
        }
        StringBuilder sb = new StringBuilder();
        for (Node n : timestamps.keySet()) {
            Long timestamp = timestamps.get(n);
            sb.append(n.toString()).append(':').append(timestamp).append('\n');
        }
        byte[] val = sb.toString().getBytes(utf8);
        try {
            this.context.log(log, Level.FINE, () -> "Storing start timestamps at '" + this.paths.startTimestamps() + "'");
            this.session.setData(this.paths.startTimestamps(), val, -1);
            return true;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
            return false;
        }
    }

    @Override
    public Map<Node, Long> retrieveStartTimestamps() {
        try {
            this.context.log(log, Level.FINE, () -> "Fetching start timestamps at '" + this.paths.startTimestamps() + "'");
            Stat stat = new Stat();
            byte[] data = this.session.getData(this.paths.startTimestamps(), false, stat);
            TreeMap<Node, Long> wanted = new TreeMap<Node, Long>();
            if (data != null && data.length > 0) {
                StringTokenizer st = new StringTokenizer(new String(data, utf8), "\n", false);
                while (st.hasMoreTokens()) {
                    String token = st.nextToken();
                    int colon = token.indexOf(58);
                    try {
                        if (colon < 0) {
                            throw new Exception();
                        }
                        Node n = new Node(token.substring(0, colon));
                        Long timestamp = Long.valueOf(token.substring(colon + 1));
                        wanted.put(n, timestamp);
                    }
                    catch (Exception e) {
                        this.context.log(log, Level.WARNING, "Ignoring invalid starttimestamp line in zookeeper '" + token + "'.");
                    }
                }
            }
            return wanted;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to retrieve start timestamps from ZooKeeper");
            return null;
        }
    }

    @Override
    public boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) {
        SlimeClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
        byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle);
        try {
            this.context.log(log, Level.FINE, () -> String.format("Storing published state bundle %s at '%s' with expected znode version %d", stateBundle, this.paths.publishedStateBundle(), this.lastKnownStateBundleZNodeVersion));
            Stat stat = this.session.setData(this.paths.publishedStateBundle(), encodedBundle, this.lastKnownStateBundleZNodeVersion);
            this.lastKnownStateBundleZNodeVersion = stat.getVersion();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (KeeperException.BadVersionException e) {
            throw new CasWriteFailed(String.format("version mismatch in cluster state bundle znode (expected %d): %s", this.lastKnownStateBundleZNodeVersion, e.getMessage()), e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to store last published cluster state bundle in ZooKeeper");
            return false;
        }
        return true;
    }

    @Override
    public ClusterStateBundle retrieveLastPublishedStateBundle() {
        Stat stat = new Stat();
        try {
            byte[] data = this.session.getData(this.paths.publishedStateBundle(), false, stat);
            this.lastKnownStateBundleZNodeVersion = stat.getVersion();
            if (data != null && data.length != 0) {
                SlimeClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
                return envelopedBundleCodec.decodeWithEnvelope(data);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            this.maybeLogExceptionWarning(e, "Failed to retrieve last published cluster state bundle from ZooKeeper, will use an empty state as baseline");
        }
        this.lastKnownStateBundleZNodeVersion = 0;
        return ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
    }

    private class ZooKeeperWatcher
    implements Watcher {
        private Watcher.Event.KeeperState state = null;

        private ZooKeeperWatcher() {
        }

        public Watcher.Event.KeeperState getState() {
            return this.state == null ? Watcher.Event.KeeperState.SyncConnected : this.state;
        }

        public void process(WatchedEvent watchedEvent) {
            if (this.state != null && this.state.equals((Object)Watcher.Event.KeeperState.Expired)) {
                ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got event from ZooKeeper session after it expired");
                return;
            }
            Watcher.Event.KeeperState newState = watchedEvent.getState();
            if (this.state == null || !this.state.equals((Object)newState)) {
                switch (newState) {
                    case Expired: {
                        ZooKeeperDatabase.this.context.log(log, Level.INFO, "Zookeeper session expired");
                        ZooKeeperDatabase.this.sessionOpen = false;
                        ZooKeeperDatabase.this.listener.handleZooKeeperSessionDown();
                        break;
                    }
                    case Disconnected: {
                        ZooKeeperDatabase.this.context.log(log, Level.INFO, "Lost connection to zookeeper server");
                        ZooKeeperDatabase.this.sessionOpen = false;
                        ZooKeeperDatabase.this.listener.handleZooKeeperSessionDown();
                        break;
                    }
                    case SyncConnected: {
                        ZooKeeperDatabase.this.context.log(log, Level.INFO, "Connection to zookeeper server established. Refetching master data");
                        if (ZooKeeperDatabase.this.masterDataGatherer == null) break;
                        ZooKeeperDatabase.this.masterDataGatherer.restart();
                    }
                }
            }
            switch (watchedEvent.getType()) {
                case NodeChildrenChanged: {
                    ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeChildrenChanged");
                    break;
                }
                case NodeDataChanged: {
                    ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeDataChanged");
                    break;
                }
                case NodeCreated: {
                    ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeCreated");
                    break;
                }
                case NodeDeleted: {
                    ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got unexpected ZooKeeper event NodeDeleted");
                    break;
                }
                case None: {
                    if (this.state == null || !this.state.equals((Object)watchedEvent.getState())) break;
                    ZooKeeperDatabase.this.context.log(log, Level.WARNING, "Got None type event that didn't even alter session state. What does that indicate?");
                }
            }
            this.state = watchedEvent.getState();
        }
    }
}

