/*
 * Decompiled with CFR 0.152.
 */
package org.fusesource.fabric.activemq;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.fusesource.fabric.groups.ChangeListener;
import org.fusesource.fabric.groups.Group;
import org.fusesource.fabric.groups.ZooKeeperGroupFactory;
import org.linkedin.util.clock.Timespan;
import org.linkedin.zookeeper.client.IZKClient;
import org.linkedin.zookeeper.client.ZKClient;

public class FabricDiscoveryAgent
implements DiscoveryAgent {
    private static final Log LOG = LogFactory.getLog(FabricDiscoveryAgent.class);
    private IZKClient zkClient;
    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private Group group;
    private String groupName = "default";
    private AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference();
    private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap();
    private final AtomicInteger startCounter = new AtomicInteger(0);
    private long initialReconnectDelay = 1000L;
    private long maxReconnectDelay = 30000L;
    private long backOffMultiplier = 2L;
    private boolean useExponentialBackOff = true;
    private int maxReconnectAttempts = 0;
    private final Object sleepMutex = new Object();
    private long minConnectTime = 5000L;
    private String serviceName;

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public void registerService(String service) throws IOException {
        this.serviceName = service;
        if (this.startCounter.get() == 1) {
            this.group.join(this.serviceName, this.serviceName.getBytes("UTF-8"));
        }
    }

    public void serviceFailed(DiscoveryEvent devent) throws IOException {
        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
        if (event.failed.compareAndSet(false, true)) {
            this.discoveryListener.get().onServiceRemove((DiscoveryEvent)event);
            if (!event.removed.get()) {
                Thread thread = new Thread(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        if (event.connectTime + FabricDiscoveryAgent.this.minConnectTime > System.currentTimeMillis()) {
                            LOG.debug((Object)("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: " + (Object)((Object)event)));
                            event.connectFailures++;
                            if (FabricDiscoveryAgent.this.maxReconnectAttempts > 0 && event.connectFailures >= FabricDiscoveryAgent.this.maxReconnectAttempts) {
                                LOG.debug((Object)("Reconnect attempts exceeded " + FabricDiscoveryAgent.this.maxReconnectAttempts + " tries.  Reconnecting has been disabled."));
                                return;
                            }
                            Object object = FabricDiscoveryAgent.this.sleepMutex;
                            synchronized (object) {
                                try {
                                    if (!FabricDiscoveryAgent.this.running.get() || event.removed.get()) {
                                        return;
                                    }
                                    LOG.debug((Object)("Waiting " + event.reconnectDelay + " ms before attepting to reconnect."));
                                    FabricDiscoveryAgent.this.sleepMutex.wait(event.reconnectDelay);
                                }
                                catch (InterruptedException ie) {
                                    Thread.currentThread().interrupt();
                                    return;
                                }
                            }
                            if (!FabricDiscoveryAgent.this.useExponentialBackOff) {
                                event.reconnectDelay = FabricDiscoveryAgent.this.initialReconnectDelay;
                            } else {
                                event.reconnectDelay *= FabricDiscoveryAgent.this.backOffMultiplier;
                                if (event.reconnectDelay > FabricDiscoveryAgent.this.maxReconnectDelay) {
                                    event.reconnectDelay = FabricDiscoveryAgent.this.maxReconnectDelay;
                                }
                            }
                        } else {
                            event.connectFailures = 0;
                            event.reconnectDelay = FabricDiscoveryAgent.this.initialReconnectDelay;
                        }
                        if (!FabricDiscoveryAgent.this.running.get() || event.removed.get()) {
                            return;
                        }
                        event.connectTime = System.currentTimeMillis();
                        event.failed.set(false);
                        ((DiscoveryListener)FabricDiscoveryAgent.this.discoveryListener.get()).onServiceAdd((DiscoveryEvent)event);
                    }
                };
                thread.setDaemon(true);
                thread.start();
            }
        }
    }

    public void setDiscoveryListener(DiscoveryListener discoveryListener) {
        this.discoveryListener.set(discoveryListener);
    }

    public synchronized void start() throws Exception {
        if (this.startCounter.addAndGet(1) == 1) {
            this.running.set(true);
            if (this.zkClient == null) {
                ZKClient client = new ZKClient(System.getProperty("zookeeper.url", "localhost:2181"), Timespan.parse((String)"10s"), null);
                client.start();
                client.waitForStart();
                this.zkClient = client;
            }
            this.group = ZooKeeperGroupFactory.create(this.zkClient, "/fabric/activemq-clusters/" + this.groupName, this.acl);
            this.group.add(new ChangeListener(){

                @Override
                public void changed(byte[][] members2) {
                    FabricDiscoveryAgent.this.update(members2);
                }
            });
            if (this.serviceName != null) {
                this.group.join(this.serviceName, this.serviceName.getBytes("UTF-8"));
            }
        }
    }

    public synchronized void stop() throws Exception {
        if (this.startCounter.decrementAndGet() == 0) {
            this.running.set(false);
            this.group.close();
            this.zkClient.close();
            this.zkClient = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void update(byte[][] members2) {
        DiscoveryListener discoveryListener = this.discoveryListener.get();
        if (discoveryListener != null) {
            HashSet<String> activeServices = new HashSet<String>();
            for (byte[] m : members2) {
                try {
                    activeServices.add(new String(m, "UTF-8"));
                }
                catch (UnsupportedEncodingException e) {
                    // empty catch block
                }
            }
            if (members2 != null) {
                HashMap<String, SimpleDiscoveryEvent> hashMap = this.discoveredServices;
                synchronized (hashMap) {
                    SimpleDiscoveryEvent e;
                    HashSet<String> removedServices = new HashSet<String>(this.discoveredServices.keySet());
                    removedServices.removeAll(activeServices);
                    HashSet addedServices = new HashSet(activeServices);
                    addedServices.removeAll(this.discoveredServices.keySet());
                    addedServices.removeAll(removedServices);
                    for (String service : addedServices) {
                        e = new SimpleDiscoveryEvent(service);
                        this.discoveredServices.put(service, e);
                        discoveryListener.onServiceAdd((DiscoveryEvent)e);
                    }
                    for (String service : removedServices) {
                        e = this.discoveredServices.remove(service);
                        if (e != null) {
                            e.removed.set(true);
                        }
                        discoveryListener.onServiceRemove((DiscoveryEvent)e);
                    }
                }
            }
        }
    }

    class SimpleDiscoveryEvent
    extends DiscoveryEvent {
        private int connectFailures;
        private long reconnectDelay;
        private long connectTime;
        private AtomicBoolean failed;
        private AtomicBoolean removed;

        public SimpleDiscoveryEvent(String service) {
            super(service);
            this.reconnectDelay = FabricDiscoveryAgent.this.initialReconnectDelay;
            this.connectTime = System.currentTimeMillis();
            this.failed = new AtomicBoolean(false);
            this.removed = new AtomicBoolean(false);
        }
    }
}

