001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.shiro.event.support;
020
021import org.apache.shiro.event.EventBus;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import java.util.ArrayList;
026import java.util.HashSet;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.concurrent.locks.Lock;
032import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034/**
035 * A default event bus implementation that synchronously publishes events to registered listeners.  Listeners can be
036 * registered or unregistered for events as necessary.
037 * <p/>
038 * An event bus enables a publish/subscribe paradigm within Shiro - components can publish or consume events they
039 * find relevant without needing to be tightly coupled to other components.  This affords great
040 * flexibility within Shiro by promoting loose coupling and high cohesion between components and a much safer
041 * pluggable architecture that is more resilient to change over time.
042 * <h2>Sending Events</h2>
043 * If a component wishes to publish events to other components:
044 * <pre>
045 *     MyEvent myEvent = createMyEvent();
046 *     eventBus.publish(myEvent);
047 * </pre>
048 * The event bus will determine the type of event and then dispatch the event to components that wish to receive
049 * events of that type.
050 * <h2>Receiving Events</h2>
051 * A component can receive events of interest by doing the following.
052 * <ol>
053 * <li>For each type of event you wish to consume, create a public method that accepts a single event argument.
054 * The method argument type indicates the type of event to receive.</li>
055 * <li>Annotate each of these public methods with the {@link org.apache.shiro.event.Subscribe Subscribe} annotation.</li>
056 * <li>Register the component with the event bus:
057 * <pre>
058 *         eventBus.register(myComponent);
059 *     </pre>
060 * </li>
061 * </ol>
062 * After registering the component, when when an event of a respective type is published, the component's
063 * {@code Subscribe}-annotated method(s) will be invoked as expected.
064 * <p>
065 * This design (and its constituent helper components) was largely influenced by
066 * Guava's <a href="http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/eventbus/EventBus.html">EventBus</a>
067 * concept, although no code was shared/imported (even though Guava is Apache 2.0 licensed and could have
068 * been used).
069 * <p>
070 * This implementation is thread-safe and may be used concurrently.
071 *
072 * @since 1.3
073 */
074public class DefaultEventBus implements EventBus {
075
076    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventBus.class);
077
078    private static final String EVENT_LISTENER_ERROR_MSG = "Event listener processing failed.  Listeners should "
079            + "generally handle exceptions directly and not propagate to the event bus.";
080
081    //this is stateless, we can retain a static final reference:
082    private static final EventListenerComparator EVENT_LISTENER_COMPARATOR = new EventListenerComparator();
083
084    //We want to preserve registration order to deliver events to objects in the order that they are registered
085    //with the event bus.  This has the nice effect that any Shiro system-level components that are registered first
086    //(likely to happen upon startup) have precedence over those registered by end-user components later.
087    //
088    //One might think that this could have been done by just using a ConcurrentSkipListMap (which is available only on
089    //JDK 6 or later).  However, this approach requires the implementation of a Comparator to sort elements, and this
090    //surfaces a problem: for any given random event listener, there isn't any guaranteed property to exist that can be
091    //inspected to determine order of registration, since registration order is an artifact of this EventBus
092    //implementation, not the listeners themselves.
093    //
094    //Therefore, we use a simple concurrent lock to wrap a LinkedHashMap - the LinkedHashMap retains insertion order
095    //and the lock provides thread-safety in probably a much simpler mechanism than attempting to write a
096    //EventBus-specific Comparator.  This technique is also likely to be faster than a ConcurrentSkipListMap, which
097    //is about 3-5 times slower than a standard ConcurrentMap.
098    final Map<Object, Subscription> registry;
099    private final Lock registryReadLock;
100    private final Lock registryWriteLock;
101    private EventListenerResolver eventListenerResolver;
102
103    public DefaultEventBus() {
104        //not thread safe, so we need locks:
105        this.registry = new LinkedHashMap<Object, Subscription>();
106        ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
107        this.registryReadLock = rwl.readLock();
108        this.registryWriteLock = rwl.writeLock();
109        this.eventListenerResolver = new AnnotationEventListenerResolver();
110    }
111
112    public EventListenerResolver getEventListenerResolver() {
113        return eventListenerResolver;
114    }
115
116    public void setEventListenerResolver(EventListenerResolver eventListenerResolver) {
117        this.eventListenerResolver = eventListenerResolver;
118    }
119
120    public void publish(Object event) {
121        if (event == null) {
122            LOGGER.info("Received null event for publishing.  Ignoring and returning.");
123            return;
124        }
125
126        registryReadLock.lock();
127        try {
128            //performing the entire iteration within the lock will be a slow operation if the registry has a lot of
129            //contention.  However, it is expected that the very large majority of cases the registry will be
130            //read-mostly with very little writes (registrations or removals) occurring during a typical application
131            //lifetime.
132            //
133            //The alternative would be to copy the registry.values() collection to a new LinkedHashSet within the lock
134            //only and the iteration on this new collection could be outside the lock.  This has the performance penalty
135            //however of always creating a new collection every time an event is published,  which could be more
136            //costly for the majority of applications, especially if the number of listeners is large.
137            //
138            //Finally, the read lock is re-entrant, so multiple publish calls will be
139            //concurrent without penalty since publishing is a read-only operation on the registry.
140
141            for (Subscription subscription : this.registry.values()) {
142                subscription.onEvent(event);
143            }
144        } finally {
145            registryReadLock.unlock();
146        }
147    }
148
149    public void register(Object instance) {
150        if (instance == null) {
151            LOGGER.info("Received null instance for event listener registration.  Ignoring registration request.");
152            return;
153        }
154
155        unregister(instance);
156
157        List<EventListener> listeners = getEventListenerResolver().getEventListeners(instance);
158
159        if (listeners == null || listeners.isEmpty()) {
160            LOGGER.warn("Unable to resolve event listeners for subscriber instance [{}]. Ignoring registration request.",
161                    instance);
162            return;
163        }
164
165        Subscription subscription = new Subscription(listeners);
166
167        this.registryWriteLock.lock();
168        try {
169            this.registry.put(instance, subscription);
170        } finally {
171            this.registryWriteLock.unlock();
172        }
173    }
174
175    public void unregister(Object instance) {
176        if (instance == null) {
177            return;
178        }
179        this.registryWriteLock.lock();
180        try {
181            this.registry.remove(instance);
182        } finally {
183            this.registryWriteLock.unlock();
184        }
185    }
186
187    private static class Subscription {
188
189        private final List<EventListener> listeners;
190
191        Subscription(List<EventListener> listeners) {
192            List<EventListener> toSort = new ArrayList<EventListener>(listeners);
193            toSort.sort(EVENT_LISTENER_COMPARATOR);
194            this.listeners = toSort;
195        }
196
197        public void onEvent(Object event) {
198
199            Set<Object> delivered = new HashSet<Object>();
200
201            for (EventListener listener : this.listeners) {
202                Object target = listener;
203                if (listener instanceof SingleArgumentMethodEventListener) {
204                    SingleArgumentMethodEventListener singleArgListener = (SingleArgumentMethodEventListener) listener;
205                    target = singleArgListener.getTarget();
206                }
207                if (listener.accepts(event) && !delivered.contains(target)) {
208                    try {
209                        listener.onEvent(event);
210                    } catch (Throwable t) {
211                        LOGGER.warn(EVENT_LISTENER_ERROR_MSG, t);
212                    }
213                    delivered.add(target);
214                }
215            }
216        }
217    }
218}