/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.locks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.gobblin.runtime.locks.JobLockEventListener;
import org.apache.gobblin.runtime.locks.JobLockException;
import org.apache.gobblin.runtime.locks.ListenableJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperBasedJobLock
implements ListenableJobLock {
    private static final Logger log = LoggerFactory.getLogger(ZookeeperBasedJobLock.class);
    private static final String LOCKS_ROOT_PATH = "/locks";
    private static final String CONNECTION_STRING_DEFAULT = "localhost:2181";
    private static final int LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS_DEFAULT = 5000;
    private static final int CONNECTION_TIMEOUT_SECONDS_DEFAULT = 30;
    private static final int SESSION_TIMEOUT_SECONDS_DEFAULT = 180;
    private static final int RETRY_BACKOFF_SECONDS_DEFAULT = 1;
    private static final int MAX_RETRY_COUNT_DEFAULT = 10;
    private static CuratorFramework curatorFramework;
    private static ConcurrentMap<String, JobLockEventListener> lockEventListeners;
    private static Thread curatorFrameworkShutdownHook;
    public static final String LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS = "gobblin.locks.zookeeper.acquire.timeout_milliseconds";
    public static final String CONNECTION_STRING = "gobblin.locks.zookeeper.connection_string";
    public static final String CONNECTION_TIMEOUT_SECONDS = "gobblin.locks.zookeeper.connection.timeout_seconds";
    public static final String SESSION_TIMEOUT_SECONDS = "gobblin.locks.zookeeper.session.timeout_seconds";
    public static final String RETRY_BACKOFF_SECONDS = "gobblin.locks.zookeeper.retry.backoff_seconds";
    public static final String MAX_RETRY_COUNT = "gobblin.locks.zookeeper.retry.max_count";
    private String lockPath;
    private long lockAcquireTimeoutMilliseconds;
    private InterProcessLock lock;

    public ZookeeperBasedJobLock(Properties properties) {
        this(properties, properties.getProperty("job.name"));
    }

    public ZookeeperBasedJobLock(Properties properties, String jobName) {
        this.lockAcquireTimeoutMilliseconds = ZookeeperBasedJobLock.getLong(properties, LOCKS_ACQUIRE_TIMEOUT_MILLISECONDS, 5000L);
        this.lockPath = Paths.get(LOCKS_ROOT_PATH, jobName).toString();
        ZookeeperBasedJobLock.initializeCuratorFramework(properties);
        this.lock = new InterProcessSemaphoreMutex(curatorFramework, this.lockPath);
    }

    @Override
    public void setEventListener(JobLockEventListener jobLockEventListener) {
        lockEventListeners.putIfAbsent(this.lockPath, jobLockEventListener);
    }

    @Override
    public void lock() throws JobLockException {
        try {
            this.lock.acquire();
        }
        catch (Exception e) {
            throw new JobLockException("Failed to acquire lock " + this.lockPath, e);
        }
    }

    @Override
    public void unlock() throws JobLockException {
        if (this.lock.isAcquiredInThisProcess()) {
            try {
                this.lock.release();
            }
            catch (Exception e) {
                throw new JobLockException("Failed to release lock " + this.lockPath, e);
            }
        }
    }

    @Override
    public boolean tryLock() throws JobLockException {
        try {
            return this.lock.acquire(this.lockAcquireTimeoutMilliseconds, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new JobLockException("Failed to acquire lock " + this.lockPath, e);
        }
    }

    @Override
    public boolean isLocked() throws JobLockException {
        return this.lock.isAcquiredInThisProcess();
    }

    @Override
    public void close() throws IOException {
        try {
            this.unlock();
        }
        catch (JobLockException e) {
            throw new IOException(e);
        }
        finally {
            lockEventListeners.remove(this.lockPath);
        }
    }

    private static synchronized void initializeCuratorFramework(Properties properties) {
        if (curatorFrameworkShutdownHook == null) {
            curatorFrameworkShutdownHook = new CuratorFrameworkShutdownHook();
            Runtime.getRuntime().addShutdownHook(curatorFrameworkShutdownHook);
        }
        if (curatorFramework == null) {
            CuratorFramework newCuratorFramework = CuratorFrameworkFactory.builder().connectString(properties.getProperty(CONNECTION_STRING, CONNECTION_STRING_DEFAULT)).connectionTimeoutMs(ZookeeperBasedJobLock.getMilliseconds(properties, CONNECTION_TIMEOUT_SECONDS, 30)).sessionTimeoutMs(ZookeeperBasedJobLock.getMilliseconds(properties, SESSION_TIMEOUT_SECONDS, 180)).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(ZookeeperBasedJobLock.getMilliseconds(properties, RETRY_BACKOFF_SECONDS, 1), ZookeeperBasedJobLock.getInt(properties, MAX_RETRY_COUNT, 10))).build();
            newCuratorFramework.getConnectionStateListenable().addListener((Object)new ConnectionStateListener(){

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    switch (connectionState) {
                        case LOST: {
                            log.warn("Lost connection with zookeeper");
                            for (Map.Entry lockEventListener : lockEventListeners.entrySet()) {
                                log.warn("Informing job {} that lock was lost", lockEventListener.getKey());
                                ((JobLockEventListener)lockEventListener.getValue()).onLost();
                            }
                            break;
                        }
                        case SUSPENDED: {
                            log.warn("Suspended connection with zookeeper");
                            for (Map.Entry lockEventListener : lockEventListeners.entrySet()) {
                                log.warn("Informing job {} that lock was suspended", lockEventListener.getKey());
                                ((JobLockEventListener)lockEventListener.getValue()).onLost();
                            }
                            break;
                        }
                        case CONNECTED: {
                            log.info("Connected with zookeeper");
                            break;
                        }
                        case RECONNECTED: {
                            log.warn("Regained connection with zookeeper");
                            break;
                        }
                        case READ_ONLY: {
                            log.warn("Zookeeper connection went into read-only mode");
                        }
                    }
                }
            });
            newCuratorFramework.start();
            try {
                if (!newCuratorFramework.blockUntilConnected(ZookeeperBasedJobLock.getInt(properties, CONNECTION_TIMEOUT_SECONDS, 30), TimeUnit.SECONDS)) {
                    throw new RuntimeException("Time out while waiting to connect to zookeeper");
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting to connect to zookeeper");
            }
            curatorFramework = newCuratorFramework;
        }
    }

    @VisibleForTesting
    static synchronized void shutdownCuratorFramework() {
        if (curatorFramework != null) {
            curatorFramework.close();
            curatorFramework = null;
        }
    }

    private static int getInt(Properties properties, String key, int defaultValue) {
        return Integer.parseInt(properties.getProperty(key, Integer.toString(defaultValue)));
    }

    private static long getLong(Properties properties, String key, long defaultValue) {
        return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue)));
    }

    private static int getMilliseconds(Properties properties, String key, int defaultValue) {
        return ZookeeperBasedJobLock.getInt(properties, key, defaultValue) * 1000;
    }

    static {
        lockEventListeners = Maps.newConcurrentMap();
    }

    private static class CuratorFrameworkShutdownHook
    extends Thread {
        private CuratorFrameworkShutdownHook() {
        }

        @Override
        public void run() {
            log.info("Shutting down curator framework...");
            try {
                ZookeeperBasedJobLock.shutdownCuratorFramework();
                log.info("Curator framework shut down.");
            }
            catch (Exception e) {
                log.error("Error while shutting down curator framework.", (Throwable)e);
            }
        }
    }
}

