何为事件监听者模式 ?
第一就是为啥我强调事件二字 ,因为他是目标 . 在我们开发中绝对见到过一堆后缀是 Listener
的类, 这个就是监听者模式, 监听者模式是一种 CS开发架构
,很好的做了一种设计的解耦,监听者注册到一个邮局中,订阅某种事件(提前说好了), 邮局会按需求发布消息, 监听者会及时收到消息来处理 . 其中整个Java开发环境中 , JDK已经帮我们定义好了接口 , Spring就是基于JDK接口下实现的, Guava则是另一种实现方式, 各有千秋 , 大家看看吧 , 到底是回调好还是阻塞, 还是Guava这种方式呢, 他和观察者模式有何区别呢 ? 我们有机会在讲
public class EventObject implements java.io.Serializable { protected transient Object source; public EventObject(Object source) { if (source == null) throw new IllegalArgumentException("null source"); this.source = source; } ........... 其他省略 } 复制代码
public interface EventListener { } 复制代码
我们发现 java 提供的只提供了一个事件对象 ,和一个事件监听器 ,所以需要我们遵守这个规范去开发
一般情况下 都会设置成一个 Object 类型的 , 不需要我们去设计一个,为了体现设计模式的角色,我们就设计了一个
@ToString @Setter @Getter public class EventSource { private String name; private String info; } 复制代码
这里我们继承了 EventObject , 只是简单的实现了一下 , 并没有做过多的包装
public class CoreEventObject extends EventObject { public CoreEventObject(EventSource source) { super(source); } } 复制代码
这里我们真正的监听者 , 一般情况下都需要设计成一个 函数式接口 , 我这个是和Spring框架学习的 , 因为函数式接口才能体现回调 ,
@FunctionalInterface public interface CoreEventListener<E extends CoreEventObject> extends EventListener { void onEventObject(E event); } 复制代码
事件发布者 ,因为没有发布的事件对象, 哪来的监听
public class EventPublisher<E extends CoreEventObject> { private CoreEventListener<E> listener; public EventPublisher(CoreEventListener<E> listener) { this.listener = listener; } public void publish(E object){ System.out.println("发布事件 : " + object); // 传给 CoreEventListener listener.onEventObject(object); } } 复制代码
public class TestDemo { public static void main(String[] args) { // 1. 创建一个事件发布者 EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() { @Override public void onEventObject(CoreEventObject event) { System.out.println("接收到事件源 : " + event.getSource() + " , 当前线程 : " + Thread.currentThread().getName()); } }); // 2. 发布一个事件对象 publisher.publish(getCoreEventObject()); } private static CoreEventObject getCoreEventObject(){ ..... 此处省略 return eventObject; } } 复制代码
输出结果 :
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)] 接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 当前线程 : main 复制代码
我们发现我们成功的接收到了事件对象 和 事件源 , 这个就是钩子函数的魅力 . 其实你只是做了一个事件发布你无心观察其他的东西 , 只需要一个监听者就可以做到监听了 , 这样你的事件发布 和 监听 完全就解耦了 .其实底层就是一个地址引用 .
很多场景下,我们的发布事件和监听事件完全在两个线程中,那么我们如何拿到事件对象呢 ?
如果我们简单使用一下 , 会这么写 ?
public class TestEventListener implements CoreEventListener<CoreEventObject> { private CoreEventObject object; @Override public void onEventObject(CoreEventObject object) { // 赋值给成员变量 this.object = object; } // 获取成员变量 public CoreEventObject getObject() { return object; } } 复制代码
测试一下 :
public class TestDemo { public static void main(String[] args) { TestEventListener listener = new TestEventListener(); CoreEventObject object = listener.getObject(); // 先去拿 ,后去发布 System.out.println(object.getSource()); EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener); publisher.publish(getCoreEventObject()); } private static CoreEventObject getCoreEventObject(){ .... return eventObject; } } 复制代码
输出结果
Exception in thread "main" java.lang.NullPointerException at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28) 复制代码
有些人就会说 , 你这不对哇 ,你当然拿不到了 ,因为人家还没发布了 ,但是在多线程 ,在解耦的情况下 ,你哪知道对面何时发布结束了 , 你再去拿呢 ? 那就需要java的多线程知识了 ,Future 给我们带来了提醒 , 就是阻塞的思想 , 只有监听者真正的收到对象 , 我们才能去拿 .
了解过我前面提到的那一节 FutureTask
是如何实现的 ,我觉得问题就迎刃而解了 .
public class TestEventListener implements CoreEventListener<CoreEventObject> { private CoreEventObject object; /** * 当 X = 0 ,代表 obj还没有初始化了 * 当 x = 1 , 代表 obj 以及初始化了 , 已经接收到了 */ private static volatile int x = 0; @Override public void onEventObject(CoreEventObject object) { this.object = object; // 收到改成 1 x = 1; } public CoreEventObject getObject() { while (true) { if (x == 1) { break; } } // 拿到对象,再设置为1 x = 0; return object; } } 复制代码
由于这个解决方案,会使得执行 getObject()
的线程一直的阻塞下去,就是死循环下去,我们必须一个线程去执行这个方法 ,
public class TestDemo { public static void main(String[] args) { TestEventListener listener = new TestEventListener(); // 新建一个线程去接收 Thread thread = new Thread(() -> { System.out.println("我开始接收对象 : " + System.currentTimeMillis()); CoreEventObject object = listener.getObject(); System.out.println("成功接收对象 : "+object.getSource()); }); thread.start(); // 新建一个线程去发布 EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener); new Thread(()->{ publisher.publish(getCoreEventObject()); }).start(); } private static CoreEventObject getCoreEventObject(){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } EventSource source = new EventSource(); source.setName("事件源"); source.setInfo("" + System.currentTimeMillis()); return new CoreEventObject(source); } } 复制代码
输出结果 :
我开始接收对象 : 1573282924555 发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)] 成功接收对象 : EventSource(name=事件源, info=1573282925590) 复制代码
我们我们接收的时候是在1573282924555的时间搓 , 而真正拿到的对象确实在1573282925590发布的 , 这个就完全在俩时间轴上,所以我们成功的解决了问题 .
Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.
此类被所有的 application events
所继承 。抽象的原因是因为直接发布这个 ApplicationEvent
是没有意义的。
Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.
这个接口被所有的 application event listeners.
所实现 , 基于Java的 java.util.EventListener
接口规范
我们有一个需求就是 ,我们有一个服务会从远程不断的去拉去配置信息 ,一旦有改变就会发布配置信息 .
@ToString @Setter @Getter public class Config { private String namespace; private Map<String, Object> info; } 复制代码
// 这个注解,我们是根据Spring源码看到的 , 所以一致性,我就加了 @SuppressWarnings("serial") public class ConfigEvent extends ApplicationEvent { public ConfigEvent(Config source) { super(source); } } 复制代码
@Component public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } // 保证执行顺序 , 多个 ConfigEventListener就需要实现这个接口 @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } // 初始化以后要做什么 ? @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前ConfigEventListener"); } } 复制代码
@Service public class ConfigServer { // 注入applicationContext,因为只有他才可以执行发布事件 @Autowired private ApplicationContext applicationContext; // 这个是开启异步 ,后面会说到 // @Async public void publishConfig(){ // 需要发布 --- > 改变的事件 System.out.println("发布事件成功 , 当前线程 : "+Thread.currentThread().getName()); applicationContext.publishEvent(getChange()); } public ConfigEvent getChange(){ Config config = new Config(); config.setNamespace("application"); HashMap<String, Object> conf = new HashMap<>(); conf.put("server.port", 8088); config.setInfo(conf); return new ConfigEvent(config); } } 复制代码
@SpringBootApplication public class SpringListenerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringListenerApplication.class, args); } @Autowired private ConfigServer server; @Override public void run(String... args) throws Exception { server.publishConfig(); } } 复制代码
输出结果 :
..... 初始化当前ConfigEventListener .... 发布事件成功 , 当前线程 : main 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : main 复制代码
所以一个 Spring-Boot
的事件监听还是很简单的 ,类比到 Spring
一个道理,相信懂得人都知道 . 但是又一个问题是我们的 发布和监听都是 main线程 ,不好吧 ,玩意有很多事件了 ?
需要两个注解 @EnableAsync
启动Async功能 , 和 @Async
某个方法使用异步执行
发布事件成功 , 当前线程 : SimpleAsyncTaskExecutor-1 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : SimpleAsyncTaskExecutor-1 复制代码
我们发现就出现了线程池执行 , 这个理的线程池 ,是可以进行配置的 , 只需要我们显式的注入下面这个 SimpleAsyncTaskExecutor
Bean 就可以了
@Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() { SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(); // 需要传入一个 ThreadFactory实现类 , 所以看过我前面写的文章应该会写这个,比如 JUC- Executor那节 executor.setThreadFactory(new MyThreadFactory("anthony")); return executor; } 复制代码
输出结果 :
发布事件成功 , 当前线程 : anthony-1 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 复制代码
你可以跟我一样选择实现 ApplicationListener
和 Ordered
,或者你可以直接实现 SmartApplicationListener
都一样的哈,没有哪个好哪个不好
监听器 一 :
@Component public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前监听器 : " + this.toString()); } } 复制代码
监听器 二 :
@Component public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE-1; } @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前监听器 : " + this.toString()); } } 复制代码
输出结果 :
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f 初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289 ..... 发布事件成功 , 当前线程 : anthony-1 ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 ConfigEventListenerEnd 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 复制代码
Guava的EventBus 就是一个很好的事件注册发布的管理工具 , 他属于一种推送的模式 , 跟spring的很相似,
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.0-jre</version> </dependency> 复制代码
主要的对象就是 , EventBus
事件总线, 他是管理所有监听者(或者叫做订阅者) ,通过 EventBus#register
或者 EventBus#unRegister
来管理的 , 同时监听者要有监听的事件 , 这里是基于方法级别的 (注意方法只能有一个参数,就是监听的事件), 需要在方法上加上 @Subscribe
注解来表示监听 , EventBus
可以通过 EventBus#post
方法来发布事件 , 对应类型的监听者就会收到 . 同时 EventBus
可以处理异常
public class QuicklyStart { public static void main(String[] args) { // 创建一个 事件总线 EventBus bus = new EventBus(new SubscriberExceptionHandler() { @Override public void handleException(Throwable exception, SubscriberExceptionContext context) { // 处理订阅者异常信息 System.out.println("异常信息 : "+exception.getMessage() + ", 异常事件 : " + context.getEvent()); } }); // 注册你的监听器 , 其实更加准确来说是订阅者 , 他属于一种发布订阅模式 bus.register(new EventListener()); // 事件总线发布事件 bus.post("sb"); // 事件总线发布事件 bus.post(new Event("hello Guava")); } } /** * 事件源 */ class Event { String msg; public Event(String msg) { this.msg = msg; } @Override public String toString() { return "Event{" + "msg='" + msg + '/'' + '}'; } } /** * 监听器 */ class EventListener { /** * {@link Subscribe} 一个这个代表一个订阅者,EventBus会将符合的事件发布到对应的订阅者上 , 但是不支持java的基本数据类型, int 之类的 * * @param event */ @Subscribe public void onEvent(Event event) { System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event); } @Subscribe public void onStringEvent(String event) { error(); // 模拟异常 System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event); } private void error() { int i = 1 / 0; } } 复制代码
输出 :
异常信息 : / by zero, 异常事件 : sb 当前线程 : main, 接收到事件 : Event{msg='hello Guava'} 复制代码
其实很简单 , 第一注册的时候 :
/** Registers all subscriber methods on the given listener object. */ void register(Object listener) { // 其实就是注解扫描 , 然后把元信息都给整出来 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); // 查找有没有 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); // 没有创建一个对象 if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } // 添加进去 ,其实是 Subscriber对象 , 把method信息, 对象信息全部封装进去了 eventSubscribers.addAll(eventMethodsInListener); } } 复制代码
第二就是 发布
public void post(Object event) { // 根据事件获取对应的 Subscriber Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 发布出去 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } } @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); // 获取当前线程的队列 ,用ThreadLocal维护的线程安全, 其实是为了安全 Queue<Event> queueForThread = queue.get(); // 创建一个事件对象 queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { // 处理 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } /** Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { // 关键点 ,就是这 invokeSubscriberMethod(event); } catch (InvocationTargetException e) { // 出现异常直接就 ... 调用异常回调方法了 bus.handleSubscriberException(e.getCause(), context(event)); } } }); } @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { // 原来如此 , .........method.invok 真.... 所以可以抓取异常 method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } 复制代码
所以这个玩意很简单 , 原理一看就分析出来了