001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.shiro.event.support; 020 021import org.apache.shiro.event.EventBus; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import java.util.ArrayList; 026import java.util.HashSet; 027import java.util.LinkedHashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.locks.Lock; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034/** 035 * A default event bus implementation that synchronously publishes events to registered listeners. Listeners can be 036 * registered or unregistered for events as necessary. 037 * <p/> 038 * An event bus enables a publish/subscribe paradigm within Shiro - components can publish or consume events they 039 * find relevant without needing to be tightly coupled to other components. This affords great 040 * flexibility within Shiro by promoting loose coupling and high cohesion between components and a much safer 041 * pluggable architecture that is more resilient to change over time. 042 * <h2>Sending Events</h2> 043 * If a component wishes to publish events to other components: 044 * <pre> 045 * MyEvent myEvent = createMyEvent(); 046 * eventBus.publish(myEvent); 047 * </pre> 048 * The event bus will determine the type of event and then dispatch the event to components that wish to receive 049 * events of that type. 050 * <h2>Receiving Events</h2> 051 * A component can receive events of interest by doing the following. 052 * <ol> 053 * <li>For each type of event you wish to consume, create a public method that accepts a single event argument. 054 * The method argument type indicates the type of event to receive.</li> 055 * <li>Annotate each of these public methods with the {@link org.apache.shiro.event.Subscribe Subscribe} annotation.</li> 056 * <li>Register the component with the event bus: 057 * <pre> 058 * eventBus.register(myComponent); 059 * </pre> 060 * </li> 061 * </ol> 062 * After registering the component, when when an event of a respective type is published, the component's 063 * {@code Subscribe}-annotated method(s) will be invoked as expected. 064 * <p> 065 * This design (and its constituent helper components) was largely influenced by 066 * Guava's <a href="http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/eventbus/EventBus.html">EventBus</a> 067 * concept, although no code was shared/imported (even though Guava is Apache 2.0 licensed and could have 068 * been used). 069 * <p> 070 * This implementation is thread-safe and may be used concurrently. 071 * 072 * @since 1.3 073 */ 074public class DefaultEventBus implements EventBus { 075 076 private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventBus.class); 077 078 private static final String EVENT_LISTENER_ERROR_MSG = "Event listener processing failed. Listeners should " 079 + "generally handle exceptions directly and not propagate to the event bus."; 080 081 //this is stateless, we can retain a static final reference: 082 private static final EventListenerComparator EVENT_LISTENER_COMPARATOR = new EventListenerComparator(); 083 084 //We want to preserve registration order to deliver events to objects in the order that they are registered 085 //with the event bus. This has the nice effect that any Shiro system-level components that are registered first 086 //(likely to happen upon startup) have precedence over those registered by end-user components later. 087 // 088 //One might think that this could have been done by just using a ConcurrentSkipListMap (which is available only on 089 //JDK 6 or later). However, this approach requires the implementation of a Comparator to sort elements, and this 090 //surfaces a problem: for any given random event listener, there isn't any guaranteed property to exist that can be 091 //inspected to determine order of registration, since registration order is an artifact of this EventBus 092 //implementation, not the listeners themselves. 093 // 094 //Therefore, we use a simple concurrent lock to wrap a LinkedHashMap - the LinkedHashMap retains insertion order 095 //and the lock provides thread-safety in probably a much simpler mechanism than attempting to write a 096 //EventBus-specific Comparator. This technique is also likely to be faster than a ConcurrentSkipListMap, which 097 //is about 3-5 times slower than a standard ConcurrentMap. 098 final Map<Object, Subscription> registry; 099 private final Lock registryReadLock; 100 private final Lock registryWriteLock; 101 private EventListenerResolver eventListenerResolver; 102 103 public DefaultEventBus() { 104 //not thread safe, so we need locks: 105 this.registry = new LinkedHashMap<Object, Subscription>(); 106 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 107 this.registryReadLock = rwl.readLock(); 108 this.registryWriteLock = rwl.writeLock(); 109 this.eventListenerResolver = new AnnotationEventListenerResolver(); 110 } 111 112 public EventListenerResolver getEventListenerResolver() { 113 return eventListenerResolver; 114 } 115 116 public void setEventListenerResolver(EventListenerResolver eventListenerResolver) { 117 this.eventListenerResolver = eventListenerResolver; 118 } 119 120 public void publish(Object event) { 121 if (event == null) { 122 LOGGER.info("Received null event for publishing. Ignoring and returning."); 123 return; 124 } 125 126 registryReadLock.lock(); 127 try { 128 //performing the entire iteration within the lock will be a slow operation if the registry has a lot of 129 //contention. However, it is expected that the very large majority of cases the registry will be 130 //read-mostly with very little writes (registrations or removals) occurring during a typical application 131 //lifetime. 132 // 133 //The alternative would be to copy the registry.values() collection to a new LinkedHashSet within the lock 134 //only and the iteration on this new collection could be outside the lock. This has the performance penalty 135 //however of always creating a new collection every time an event is published, which could be more 136 //costly for the majority of applications, especially if the number of listeners is large. 137 // 138 //Finally, the read lock is re-entrant, so multiple publish calls will be 139 //concurrent without penalty since publishing is a read-only operation on the registry. 140 141 for (Subscription subscription : this.registry.values()) { 142 subscription.onEvent(event); 143 } 144 } finally { 145 registryReadLock.unlock(); 146 } 147 } 148 149 public void register(Object instance) { 150 if (instance == null) { 151 LOGGER.info("Received null instance for event listener registration. Ignoring registration request."); 152 return; 153 } 154 155 unregister(instance); 156 157 List<EventListener> listeners = getEventListenerResolver().getEventListeners(instance); 158 159 if (listeners == null || listeners.isEmpty()) { 160 LOGGER.warn("Unable to resolve event listeners for subscriber instance [{}]. Ignoring registration request.", 161 instance); 162 return; 163 } 164 165 Subscription subscription = new Subscription(listeners); 166 167 this.registryWriteLock.lock(); 168 try { 169 this.registry.put(instance, subscription); 170 } finally { 171 this.registryWriteLock.unlock(); 172 } 173 } 174 175 public void unregister(Object instance) { 176 if (instance == null) { 177 return; 178 } 179 this.registryWriteLock.lock(); 180 try { 181 this.registry.remove(instance); 182 } finally { 183 this.registryWriteLock.unlock(); 184 } 185 } 186 187 private static class Subscription { 188 189 private final List<EventListener> listeners; 190 191 Subscription(List<EventListener> listeners) { 192 List<EventListener> toSort = new ArrayList<EventListener>(listeners); 193 toSort.sort(EVENT_LISTENER_COMPARATOR); 194 this.listeners = toSort; 195 } 196 197 public void onEvent(Object event) { 198 199 Set<Object> delivered = new HashSet<Object>(); 200 201 for (EventListener listener : this.listeners) { 202 Object target = listener; 203 if (listener instanceof SingleArgumentMethodEventListener) { 204 SingleArgumentMethodEventListener singleArgListener = (SingleArgumentMethodEventListener) listener; 205 target = singleArgListener.getTarget(); 206 } 207 if (listener.accepts(event) && !delivered.contains(target)) { 208 try { 209 listener.onEvent(event); 210 } catch (Throwable t) { 211 LOGGER.warn(EVENT_LISTENER_ERROR_MSG, t); 212 } 213 delivered.add(target); 214 } 215 } 216 } 217 } 218}