/*
 * Decompiled with CFR 0.152.
 */
package org.atmosphere.cpr;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.AtmosphereServlet;
import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterConfig;
import org.atmosphere.cpr.BroadcasterFactory;
import org.atmosphere.cpr.BroadcasterFuture;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicy;
import org.atmosphere.cpr.BroadcasterLifeCyclePolicyListener;
import org.atmosphere.di.InjectorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBroadcaster
implements Broadcaster {
    private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcaster.class);
    public static final String CACHED = DefaultBroadcaster.class.getName() + ".messagesCached";
    private static final String DESTROYED = "This Broadcaster has been destroyed and cannot be used {} by invoking {}";
    protected final ConcurrentLinkedQueue<AtmosphereResource<?, ?>> resources = new ConcurrentLinkedQueue();
    protected BroadcasterConfig bc;
    protected final BlockingQueue<Entry> messages = new LinkedBlockingQueue<Entry>();
    protected final BlockingQueue<AsyncWriteToken> asyncWriteQueue = new LinkedBlockingQueue<AsyncWriteToken>();
    protected final AtomicBoolean started = new AtomicBoolean(false);
    protected final AtomicBoolean destroyed = new AtomicBoolean(false);
    protected Broadcaster.SCOPE scope = Broadcaster.SCOPE.APPLICATION;
    protected String name = DefaultBroadcaster.class.getSimpleName();
    protected final ConcurrentLinkedQueue<Entry> delayedBroadcast = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<Entry> broadcastOnResume = new ConcurrentLinkedQueue();
    protected final ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners = new ConcurrentLinkedQueue();
    protected Future<?> notifierFuture;
    protected Future<?> asyncWriteFuture;
    protected BroadcasterCache broadcasterCache;
    private Broadcaster.POLICY policy = Broadcaster.POLICY.FIFO;
    private final AtomicLong maxSuspendResource = new AtomicLong(-1L);
    private final AtomicBoolean requestScoped = new AtomicBoolean(false);
    private BroadcasterLifeCyclePolicy lifeCyclePolicy = new BroadcasterLifeCyclePolicy.Builder().policy(BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.NEVER).build();
    private Future<?> currentLifecycleTask;
    protected URI uri;
    protected AtmosphereServlet.AtmosphereConfig config;

    public DefaultBroadcaster(String name, URI uri, AtmosphereServlet.AtmosphereConfig config) {
        this.name = name;
        this.uri = uri;
        this.config = config;
        this.broadcasterCache = new BroadcasterConfig.DefaultBroadcasterCache();
        this.bc = new BroadcasterConfig(AtmosphereServlet.broadcasterFilters, config);
    }

    public DefaultBroadcaster(String name, AtmosphereServlet.AtmosphereConfig config) {
        this(name, URI.create("http://localhost"), config);
    }

    @Override
    public synchronized void destroy() {
        if (this.destroyed.get()) {
            return;
        }
        try {
            logger.trace("Broadcaster {} is being destroyed and cannot be re-used", (Object)this.getID());
            if (BroadcasterFactory.getDefault() != null) {
                BroadcasterFactory.getDefault().remove(this, this.getID());
            }
            if (this.currentLifecycleTask != null) {
                this.currentLifecycleTask.cancel(true);
            }
            this.started.set(false);
            this.destroyed.set(true);
            this.releaseExternalResources();
            if (this.notifierFuture != null) {
                this.notifierFuture.cancel(true);
            }
            if (this.asyncWriteFuture != null) {
                this.asyncWriteFuture.cancel(true);
            }
            if (this.bc != null) {
                this.bc.destroy();
            }
            if (this.broadcasterCache != null) {
                this.broadcasterCache.stop();
            }
            this.resources.clear();
            this.broadcastOnResume.clear();
            this.messages.clear();
            this.asyncWriteQueue.clear();
            this.delayedBroadcast.clear();
            this.broadcasterCache = null;
        }
        catch (Throwable t) {
            logger.error("Unexpected exception during Broadcaster destroy {}", (Object)this.getID(), (Object)t);
        }
    }

    @Override
    public Collection<AtmosphereResource<?, ?>> getAtmosphereResources() {
        return Collections.unmodifiableCollection(this.resources);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setScope(Broadcaster.SCOPE scope) {
        this.scope = scope;
        if (scope != Broadcaster.SCOPE.REQUEST) {
            return;
        }
        logger.debug("Changing broadcaster scope for {}. This broadcaster will be destroyed.", (Object)this.getID());
        ConcurrentLinkedQueue<AtmosphereResource<?, ?>> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            try {
                for (AtmosphereResource<?, ?> resource : this.resources) {
                    Broadcaster b = BroadcasterFactory.getDefault().get(this.getClass(), this.getClass().getSimpleName() + "/" + UUID.randomUUID());
                    if (DefaultBroadcaster.class.isAssignableFrom(this.getClass())) {
                        BroadcasterCache cache = (BroadcasterCache)this.bc.getBroadcasterCache().getClass().newInstance();
                        InjectorProvider.getInjector().inject(cache);
                        ((DefaultBroadcaster)DefaultBroadcaster.class.cast((Object)b)).broadcasterCache = cache;
                    }
                    resource.setBroadcaster(b);
                    b.setScope(Broadcaster.SCOPE.REQUEST);
                    if (resource.getAtmosphereResourceEvent().isSuspended()) {
                        b.addAtmosphereResource(resource);
                    }
                    logger.debug("Resource {} not using broadcaster {}", resource, (Object)b.getID());
                }
                this.destroy();
            }
            catch (Exception e) {
                logger.error("Failed to set request scope for current resources", (Throwable)e);
            }
        }
    }

    @Override
    public Broadcaster.SCOPE getScope() {
        return this.scope;
    }

    @Override
    public synchronized void setID(String id) {
        Broadcaster b;
        if (id == null) {
            id = this.getClass().getSimpleName() + "/" + UUID.randomUUID();
        }
        if ((b = BroadcasterFactory.getDefault().lookup(this.getClass(), id)) != null && b.getScope() == Broadcaster.SCOPE.REQUEST) {
            throw new IllegalStateException("Broadcaster ID already assigned to SCOPE.REQUEST. Cannot change the id");
        }
        if (b != null) {
            return;
        }
        BroadcasterFactory.getDefault().remove(this, this.name);
        this.name = id;
        BroadcasterFactory.getDefault().add(this, this.name);
    }

    @Override
    public String getID() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resumeAll() {
        ConcurrentLinkedQueue<AtmosphereResource<?, ?>> concurrentLinkedQueue = this.resources;
        synchronized (concurrentLinkedQueue) {
            for (AtmosphereResource<?, ?> r : this.resources) {
                try {
                    r.resume();
                }
                catch (Throwable t) {
                    logger.trace("resumeAll", t);
                }
            }
        }
    }

    @Override
    public void releaseExternalResources() {
    }

    @Override
    public void setBroadcasterLifeCyclePolicy(final BroadcasterLifeCyclePolicy lifeCyclePolicy) {
        this.lifeCyclePolicy = lifeCyclePolicy;
        if (this.currentLifecycleTask != null) {
            this.currentLifecycleTask.cancel(false);
        }
        if (this.bc != null && this.bc.getScheduledExecutorService() == null) {
            logger.error("No Broadcaster's SchedulerExecutorService has been configured on {}. BroadcasterLifeCyclePolicy won't work.", (Object)this.getID());
            return;
        }
        if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE || lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME || lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY) {
            int time = lifeCyclePolicy.getTimeout();
            if (time == -1) {
                throw new IllegalStateException("BroadcasterLifeCyclePolicy time is not set");
            }
            final AtomicReference ref = new AtomicReference();
            this.currentLifecycleTask = this.bc.getScheduledExecutorService().scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (DefaultBroadcaster.this.resources.isEmpty()) {
                            if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE) {
                                DefaultBroadcaster.this.notifyEmptyListener();
                                DefaultBroadcaster.this.notifyIdleListener();
                                DefaultBroadcaster.this.releaseExternalResources();
                                logger.debug("Applying BroadcasterLifeCyclePolicy IDLE policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                            } else if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY) {
                                DefaultBroadcaster.this.notifyEmptyListener();
                                DefaultBroadcaster.this.notifyIdleListener();
                                this.destroy(false);
                                logger.debug("Applying BroadcasterLifeCyclePolicy IDLE_DESTROY policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                            }
                        } else if (lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME) {
                            DefaultBroadcaster.this.notifyIdleListener();
                            this.destroy(true);
                            logger.debug("Applying BroadcasterLifeCyclePolicy IDLE_RESUME policy to Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                        }
                    }
                    catch (Throwable t) {
                        if (DefaultBroadcaster.this.destroyed.get()) {
                            logger.trace("Scheduled BroadcasterLifeCyclePolicy exception", t);
                        }
                        logger.warn("Scheduled BroadcasterLifeCyclePolicy exception", t);
                    }
                }

                void destroy(boolean resume) {
                    DefaultBroadcaster.this.notifyDestroyListener();
                    if (resume) {
                        logger.info("All AtmosphereResource will now be resumed from Broadcaster {}", (Object)DefaultBroadcaster.this.getID());
                        DefaultBroadcaster.this.resumeAll();
                    }
                    DefaultBroadcaster.this.destroy();
                    if (ref.get() != null) {
                        DefaultBroadcaster.this.currentLifecycleTask.cancel(true);
                    }
                }
            }, time, time, lifeCyclePolicy.getTimeUnit());
            ref.set(this.currentLifecycleTask);
        }
    }

    @Override
    public void addBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.add(b);
    }

    @Override
    public void removeBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b) {
        this.lifeCycleListeners.remove(b);
    }

    @Override
    public boolean isDestroyed() {
        return this.destroyed.get();
    }

    protected Runnable getBroadcastHandler() {
        return new Runnable(){

            @Override
            public void run() {
                Entry msg = null;
                while (DefaultBroadcaster.this.started.get()) {
                    try {
                        msg = DefaultBroadcaster.this.messages.take();
                        DefaultBroadcaster.this.push(msg);
                    }
                    catch (Throwable ex) {
                        if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                            logger.trace("Failed to submit broadcast handler runnable on shutdown for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                            continue;
                        }
                        logger.warn("This message {} will be lost", (Object)msg);
                        logger.debug("Failed to submit broadcast handler runnable to for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                    }
                }
            }
        };
    }

    protected void start() {
        if (!this.started.getAndSet(true)) {
            this.setID(this.name);
            this.broadcasterCache = this.bc.getBroadcasterCache();
            this.broadcasterCache.start();
            this.notifierFuture = this.bc.getExecutorService().submit(this.getBroadcastHandler());
            this.asyncWriteFuture = this.bc.getAsyncWriteService().submit(this.getAsyncWriteHandler());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void push(Entry entry) {
        Object finalMsg;
        if (this.destroyed.get()) {
            return;
        }
        String prevMessage = entry.message.toString();
        if (!this.delayedBroadcast.isEmpty()) {
            Iterator<Entry> i = this.delayedBroadcast.iterator();
            StringBuilder b = new StringBuilder();
            while (i.hasNext()) {
                Entry e = i.next();
                e.future.cancel(true);
                try {
                    if (e.message instanceof String && entry.message instanceof String) {
                        b.append(e.message);
                        continue;
                    }
                    this.push(e);
                }
                finally {
                    i.remove();
                }
            }
            if (b.length() > 0) {
                entry.message = b.append(entry.message).toString();
            }
        }
        if (this.resources.isEmpty()) {
            logger.debug("Broadcaster {} doesn't have any associated resource", (Object)this.getID());
            this.trackBroadcastMessage(null, entry.message);
            if (entry.future != null) {
                entry.future.done();
            }
            return;
        }
        entry.message = finalMsg = this.translate(entry.message);
        try {
            if (entry.multipleAtmoResources == null) {
                for (AtmosphereResource<?, ?> r : this.resources) {
                    finalMsg = this.perRequestFilter(r, entry);
                    if (finalMsg == null) {
                        logger.debug("Skipping broadcast delivery resource {} ", r);
                        continue;
                    }
                    if (!entry.writeLocally) continue;
                    this.queueWriteIO(r, finalMsg, entry);
                }
            } else if (entry.multipleAtmoResources instanceof AtmosphereResource) {
                finalMsg = this.perRequestFilter((AtmosphereResource)entry.multipleAtmoResources, entry);
                if (finalMsg == null) {
                    logger.debug("Skipping broadcast delivery resource {} ", entry.multipleAtmoResources);
                    return;
                }
                if (entry.writeLocally) {
                    this.queueWriteIO((AtmosphereResource)entry.multipleAtmoResources, finalMsg, entry);
                }
            } else if (entry.multipleAtmoResources instanceof Set) {
                Set sub = (Set)entry.multipleAtmoResources;
                for (AtmosphereResource r : sub) {
                    finalMsg = this.perRequestFilter(r, entry);
                    if (finalMsg == null) {
                        logger.debug("Skipping broadcast delivery resource {} ", (Object)r);
                        continue;
                    }
                    if (!entry.writeLocally) continue;
                    this.queueWriteIO(r, finalMsg, entry);
                }
            }
            entry.message = prevMessage;
        }
        catch (InterruptedException ex) {
            logger.debug(ex.getMessage(), (Throwable)ex);
        }
    }

    protected void queueWriteIO(AtmosphereResource<?, ?> r, Object finalMsg, Entry entry) throws InterruptedException {
        this.asyncWriteQueue.put(new AsyncWriteToken(r, finalMsg, entry.future));
    }

    protected Object perRequestFilter(AtmosphereResource<?, ?> r, Entry msg) {
        Object finalMsg = msg.message;
        if (AtmosphereResourceImpl.class.isAssignableFrom(r.getClass())) {
            if (((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(r)).isInScope()) {
                if (r.getRequest() instanceof HttpServletRequest && this.bc.hasPerRequestFilters()) {
                    Object message = msg.originalMessage;
                    BroadcastFilter.BroadcastAction a = this.bc.filter((HttpServletRequest)r.getRequest(), (HttpServletResponse)r.getResponse(), message);
                    if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT) {
                        return null;
                    }
                    if (a.message() != msg.originalMessage) {
                        finalMsg = a.message();
                    }
                }
                this.trackBroadcastMessage(r, finalMsg);
            } else {
                this.removeAtmosphereResource((AtmosphereResource)r);
                BroadcasterFactory.getDefault().removeAllAtmosphereResource(r);
            }
        }
        return finalMsg;
    }

    private Object translate(Object msg) {
        if (Callable.class.isAssignableFrom(msg.getClass())) {
            try {
                return ((Callable)Callable.class.cast(msg)).call();
            }
            catch (Exception e) {
                logger.error("failed to cast message: " + msg, (Throwable)e);
            }
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void executeAsyncWrite(AtmosphereResource<?, ?> resource, Object msg, BroadcasterFuture future) {
        boolean notifyListeners = true;
        try {
            AtmosphereResourceEventImpl event = (AtmosphereResourceEventImpl)resource.getAtmosphereResourceEvent();
            event.setMessage(msg);
            if (!((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(resource)).isInScope()) {
                this.resources.remove(resource);
                return;
            }
            try {
                ((HttpServletRequest)HttpServletRequest.class.cast(resource.getRequest())).setAttribute(ApplicationConfig.MAX_INACTIVE, (Object)System.currentTimeMillis());
            }
            catch (Throwable t) {
                logger.error("Invalid AtmosphereResource state {}", (Object)event);
                logger.error("If you are using Tomcat 7.0.22 and lower, your most probably hitting http://is.gd/NqicFT");
                logger.error("", t);
                this.removeAtmosphereResource((AtmosphereResource)resource);
                BroadcasterFactory.getDefault().removeAllAtmosphereResource(resource);
                event.setCancelled(true);
                event.setThrowable(t);
                if (notifyListeners) {
                    resource.notifyListeners();
                }
                if (future != null) {
                    future.done();
                }
                return;
            }
            this.broadcast(resource, event);
        }
        finally {
            if (notifyListeners) {
                resource.notifyListeners();
            }
            if (future != null) {
                future.done();
            }
        }
    }

    protected Runnable getAsyncWriteHandler() {
        return new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                AsyncWriteToken token = null;
                try {
                    token = DefaultBroadcaster.this.asyncWriteQueue.take();
                    AtmosphereResource<?, ?> atmosphereResource = token.resource;
                    synchronized (atmosphereResource) {
                        DefaultBroadcaster.this.bc.getAsyncWriteService().submit(this);
                        if (((AtmosphereResourceImpl)AtmosphereResourceImpl.class.cast(token.resource)).isInScope()) {
                            DefaultBroadcaster.this.executeAsyncWrite(token.resource, token.msg, token.future);
                        }
                    }
                }
                catch (Throwable ex) {
                    if (!DefaultBroadcaster.this.started.get() || DefaultBroadcaster.this.destroyed.get()) {
                        logger.trace("Failed to execute a write operation. Broadcaster is destroyed or not yet started for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                    }
                    logger.warn("This message {} will be lost", token.msg);
                    logger.debug("Failed to execute a write operation for Broadcaster {}", (Object)DefaultBroadcaster.this.getID(), (Object)ex);
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void checkCachedAndPush(AtmosphereResource<?, ?> r, AtmosphereResourceEvent e) {
        this.retrieveTrackedBroadcast(r, e);
        if (e.getMessage() instanceof List && !((List)e.getMessage()).isEmpty()) {
            ((HttpServletRequest)HttpServletRequest.class.cast(r.getRequest())).setAttribute(CACHED, (Object)"true");
            AtmosphereResource<?, ?> atmosphereResource = r;
            synchronized (atmosphereResource) {
                this.broadcast(r, e);
            }
        }
    }

    protected boolean retrieveTrackedBroadcast(AtmosphereResource<?, ?> r, AtmosphereResourceEvent e) {
        List<Object> missedMsg = this.broadcasterCache.retrieveFromCache(r);
        if (!missedMsg.isEmpty()) {
            e.setMessage(missedMsg);
            return true;
        }
        return false;
    }

    protected void trackBroadcastMessage(AtmosphereResource<?, ?> r, Object msg) {
        this.broadcasterCache.addToCache(r, msg);
    }

    protected void broadcast(AtmosphereResource<?, ?> r, AtmosphereResourceEvent e) {
        try {
            r.getAtmosphereHandler().onStateChange(e);
        }
        catch (Throwable t) {
            this.onException(t, r);
        }
    }

    protected void onException(Throwable t, final AtmosphereResource<?, ?> r) {
        logger.debug("onException()", t);
        this.removeAtmosphereResource((AtmosphereResource)r);
        AtmosphereResourceEventImpl event = (AtmosphereResourceEventImpl)r.getAtmosphereResourceEvent();
        event.setThrowable(t);
        r.notifyListeners(event);
        r.removeEventListeners();
        if (this.bc != null && this.bc.getAsyncWriteService() != null) {
            this.bc.getAsyncWriteService().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        r.resume();
                    }
                    catch (Throwable t) {
                        logger.warn("Was unable to resume a corrupted AtmosphereResource {}", (Object)r);
                        logger.warn("Cause", t);
                    }
                }
            });
        } else {
            r.resume();
        }
    }

    @Override
    public void setSuspendPolicy(long maxSuspendResource, Broadcaster.POLICY policy) {
        this.maxSuspendResource.set(maxSuspendResource);
        this.policy = policy;
    }

    @Override
    public <T> Future<T> broadcast(T msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg)");
            return null;
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return null;
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
        this.messages.offer(new Entry(newMsg, null, f, msg));
        return f;
    }

    protected Object filter(Object msg) {
        BroadcastFilter.BroadcastAction a = this.bc.filter(msg);
        if (a.action() == BroadcastFilter.BroadcastAction.ACTION.ABORT || msg == null) {
            return null;
        }
        return a.message();
    }

    @Override
    public <T> Future<T> broadcast(T msg, AtmosphereResource<?, ?> r) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, AtmosphereResource<?, ?> r");
            return null;
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return null;
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
        this.messages.offer(new Entry(newMsg, r, f, msg));
        return f;
    }

    @Override
    public <T> Future<T> broadcastOnResume(T msg) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcastOnResume(T msg)");
            return null;
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return null;
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
        this.broadcastOnResume.offer(new Entry(newMsg, null, f, msg));
        return f;
    }

    protected void broadcastOnResume(AtmosphereResource<?, ?> r) {
        for (Entry e : this.broadcastOnResume) {
            e.multipleAtmoResources = r;
            this.push(e);
        }
        if (this.resources.isEmpty()) {
            this.broadcastOnResume.clear();
        }
    }

    @Override
    public <T> Future<T> broadcast(T msg, Set<AtmosphereResource<?, ?>> subset) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"broadcast(T msg, Set<AtmosphereResource<?, ?>> subset)");
            return null;
        }
        this.start();
        Object newMsg = this.filter(msg);
        if (newMsg == null) {
            return null;
        }
        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
        this.messages.offer(new Entry(newMsg, subset, f, msg));
        return f;
    }

    @Override
    public AtmosphereResource<?, ?> addAtmosphereResource(AtmosphereResource<?, ?> r) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"addAtmosphereResource(AtmosphereResource<?, ?> r");
            return r;
        }
        this.start();
        if (this.scope == Broadcaster.SCOPE.REQUEST && this.requestScoped.getAndSet(true)) {
            throw new IllegalStateException("Broadcaster " + this + " cannot be used as its scope is set to REQUEST");
        }
        if (this.maxSuspendResource.get() > 0L && (long)this.resources.size() >= this.maxSuspendResource.get()) {
            if (this.policy == Broadcaster.POLICY.FIFO) {
                AtmosphereResource<?, ?> resource = this.resources.poll();
                try {
                    logger.warn("Too many resource. Forcing resume of {} ", resource);
                    resource.resume();
                }
                catch (Throwable t) {
                    logger.warn("failed to resume resource {} ", resource, (Object)t);
                }
            } else if (this.policy == Broadcaster.POLICY.REJECT) {
                throw new RejectedExecutionException(String.format("Maximum suspended AtmosphereResources %s", this.maxSuspendResource));
            }
        }
        if (this.resources.contains(r)) {
            return r;
        }
        if (this.resources.isEmpty()) {
            BroadcasterFactory.getDefault().add(this, this.name);
        }
        this.resources.add(r);
        this.checkCachedAndPush(r, r.getAtmosphereResourceEvent());
        return r;
    }

    public AtmosphereResource<?, ?> removeAtmosphereResource(AtmosphereResource r) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"removeAtmosphereResource(AtmosphereResource r)");
            return r;
        }
        if (!this.resources.contains(r)) {
            return null;
        }
        boolean removed = this.resources.remove(r);
        if (removed && this.resources.isEmpty()) {
            this.notifyEmptyListener();
            if (this.scope != Broadcaster.SCOPE.REQUEST && this.lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY) {
                this.releaseExternalResources();
            } else if (this.lifeCyclePolicy.getLifeCyclePolicy() == BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.EMPTY_DESTROY) {
                this.notifyDestroyListener();
                BroadcasterFactory.getDefault().remove(this, this.name);
                this.destroy();
            }
        }
        return r;
    }

    private void notifyIdleListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onIdle();
        }
    }

    private void notifyDestroyListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onDestroy();
        }
    }

    private void notifyEmptyListener() {
        for (BroadcasterLifeCyclePolicyListener b : this.lifeCycleListeners) {
            b.onEmpty();
        }
    }

    @Override
    public void setBroadcasterConfig(BroadcasterConfig bc) {
        this.bc = bc;
    }

    @Override
    public BroadcasterConfig getBroadcasterConfig() {
        return this.bc;
    }

    @Override
    public <T> Future<T> delayBroadcast(T o) {
        return this.delayBroadcast(o, 0L, null);
    }

    @Override
    public <T> Future<T> delayBroadcast(final T o, long delay, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"delayBroadcast(final T o, long delay, TimeUnit t)");
            return null;
        }
        this.start();
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        BroadcasterFuture<Object> future = new BroadcasterFuture<Object>(msg);
        final Entry e = new Entry(msg, null, future, o);
        if (delay > 0L) {
            ScheduledFuture f = this.bc.getScheduledExecutorService().schedule(new Callable<T>(){

                @Override
                public T call() throws Exception {
                    DefaultBroadcaster.this.delayedBroadcast.remove(e);
                    if (Callable.class.isAssignableFrom(o.getClass())) {
                        try {
                            Object r = ((Callable)Callable.class.cast(o)).call();
                            Object msg = DefaultBroadcaster.this.filter(r);
                            if (msg != null) {
                                Entry entry = new Entry(msg, null, null, r);
                                DefaultBroadcaster.this.push(entry);
                            }
                            return msg;
                        }
                        catch (Exception e1) {
                            logger.error("", (Object)e);
                        }
                    }
                    Object msg = DefaultBroadcaster.this.filter(o);
                    Entry e2 = new Entry(msg, null, null, o);
                    DefaultBroadcaster.this.push(e2);
                    return msg;
                }
            }, delay, t);
            e.future = new BroadcasterFuture<Object>(f, msg);
        }
        this.delayedBroadcast.offer(e);
        return future;
    }

    public Future<?> scheduleFixedBroadcast(Object o, long period, TimeUnit t) {
        return this.scheduleFixedBroadcast(o, 0L, period, t);
    }

    public Future<?> scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t) {
        if (this.destroyed.get()) {
            logger.debug(DESTROYED, (Object)this.getID(), (Object)"scheduleFixedBroadcast(final Object o, long waitFor, long period, TimeUnit t)");
            return null;
        }
        this.start();
        if (period == 0L || t == null) {
            return null;
        }
        Object msg = this.filter(o);
        if (msg == null) {
            return null;
        }
        return this.bc.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                if (Callable.class.isAssignableFrom(o.getClass())) {
                    try {
                        Object r = ((Callable)Callable.class.cast(o)).call();
                        Object msg = DefaultBroadcaster.this.filter(r);
                        if (msg != null) {
                            Entry entry = new Entry(msg, null, null, r);
                            DefaultBroadcaster.this.push(entry);
                        }
                        return;
                    }
                    catch (Exception e) {
                        logger.error("", (Throwable)e);
                    }
                }
                Object msg = DefaultBroadcaster.this.filter(o);
                Entry e = new Entry(msg, null, null, o);
                DefaultBroadcaster.this.push(e);
            }
        }, waitFor, period, t);
    }

    public String toString() {
        return this.getClass().getName() + "@" + this.hashCode() + "\n" + "\n\tName: " + this.name + "\n" + "\n\tScope: " + (Object)((Object)this.scope) + "\n" + "\n\tBroasdcasterCache " + this.broadcasterCache + "\n" + "\n\tAtmosphereResource: " + this.resources.size() + "\n";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        DefaultBroadcaster that = (DefaultBroadcaster)o;
        if (this.broadcastOnResume != null ? !this.broadcastOnResume.equals(that.broadcastOnResume) : that.broadcastOnResume != null) {
            return false;
        }
        if (this.broadcasterCache != null ? !this.broadcasterCache.equals(that.broadcasterCache) : that.broadcasterCache != null) {
            return false;
        }
        if (this.delayedBroadcast != null ? !this.delayedBroadcast.equals(that.delayedBroadcast) : that.delayedBroadcast != null) {
            return false;
        }
        if (this.destroyed != null ? !this.destroyed.equals(that.destroyed) : that.destroyed != null) {
            return false;
        }
        if (this.lifeCyclePolicy != null ? !this.lifeCyclePolicy.equals(that.lifeCyclePolicy) : that.lifeCyclePolicy != null) {
            return false;
        }
        if (this.name != null ? !this.name.equals(that.name) : that.name != null) {
            return false;
        }
        if (this.policy != that.policy) {
            return false;
        }
        if (this.requestScoped != null ? !this.requestScoped.equals(that.requestScoped) : that.requestScoped != null) {
            return false;
        }
        if (this.resources != null ? !this.resources.equals(that.resources) : that.resources != null) {
            return false;
        }
        if (this.scope != that.scope) {
            return false;
        }
        if (this.started != null ? !this.started.equals(that.started) : that.started != null) {
            return false;
        }
        return !(this.uri != null ? !this.uri.equals(that.uri) : that.uri != null);
    }

    public int hashCode() {
        int result = this.resources != null ? this.resources.hashCode() : 0;
        result = 31 * result + (this.started != null ? this.started.hashCode() : 0);
        result = 31 * result + (this.destroyed != null ? this.destroyed.hashCode() : 0);
        result = 31 * result + (this.scope != null ? this.scope.hashCode() : 0);
        result = 31 * result + (this.name != null ? this.name.hashCode() : 0);
        result = 31 * result + (this.delayedBroadcast != null ? this.delayedBroadcast.hashCode() : 0);
        result = 31 * result + (this.broadcastOnResume != null ? this.broadcastOnResume.hashCode() : 0);
        result = 31 * result + (this.broadcasterCache != null ? this.broadcasterCache.hashCode() : 0);
        result = 31 * result + (this.policy != null ? this.policy.hashCode() : 0);
        result = 31 * result + (this.requestScoped != null ? this.requestScoped.hashCode() : 0);
        result = 31 * result + (this.lifeCyclePolicy != null ? this.lifeCyclePolicy.hashCode() : 0);
        result = 31 * result + (this.uri != null ? this.uri.hashCode() : 0);
        return result;
    }

    private static final class AsyncWriteToken {
        final AtmosphereResource<?, ?> resource;
        final Object msg;
        final BroadcasterFuture future;

        public AsyncWriteToken(AtmosphereResource<?, ?> resource, Object msg, BroadcasterFuture future) {
            this.resource = resource;
            this.msg = msg;
            this.future = future;
        }

        public String toString() {
            return "AsyncWriteToken{resource=" + this.resource + ", msg=" + this.msg + ", future=" + this.future + '}';
        }
    }

    public static final class Entry {
        public Object message;
        public Object multipleAtmoResources;
        public BroadcasterFuture<?> future;
        public boolean writeLocally;
        public Object originalMessage;

        public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, Object originalMessage) {
            this.message = message;
            this.multipleAtmoResources = multipleAtmoResources;
            this.future = future;
            this.writeLocally = true;
            this.originalMessage = originalMessage;
        }

        public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, boolean writeLocally) {
            this.message = message;
            this.multipleAtmoResources = multipleAtmoResources;
            this.future = future;
            this.writeLocally = writeLocally;
        }

        public String toString() {
            return "Entry{message=" + this.message + ", multipleAtmoResources=" + this.multipleAtmoResources + ", future=" + this.future + '}';
        }
    }
}

