转载

设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋

设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋

​ 何为事件监听者模式 ?

​ 第一就是为啥我强调事件二字 ,因为他是目标 . 在我们开发中绝对见到过一堆后缀是 Listener 的类, 这个就是监听者模式, 监听者模式是一种 CS开发架构 ,很好的做了一种设计的解耦,监听者注册到一个邮局中,订阅某种事件(提前说好了), 邮局会按需求发布消息, 监听者会及时收到消息来处理 . 其中整个Java开发环境中 , JDK已经帮我们定义好了接口 , Spring就是基于JDK接口下实现的, Guava则是另一种实现方式, 各有千秋 , 大家看看吧 , 到底是回调好还是阻塞, 还是Guava这种方式呢, 他和观察者模式有何区别呢 ? 我们有机会在讲

1. Java原生规范

1. EventObject

​ 事件对象 , 他需要一个事件源 , 用构造函数传递的

public class EventObject implements java.io.Serializable {
    protected transient Object  source;

    public EventObject(Object source) {
        if (source == null)
            throw new IllegalArgumentException("null source");

        this.source = source;
    }
    ........... 其他省略 
}
复制代码

2. EventListener

​ 事件监听者,他是负责监听事件的 , JAVA提供的是一个空接口, 让我们根绝需求写

public interface EventListener {
}
复制代码

3. 总结

我们发现 java 提供的只提供了一个事件对象 ,和一个事件监听器 ,所以需要我们遵守这个规范去开发

2. Java规范设计一个监听者模式 - 基于回调模式

1. 事件源 - EventSource

一般情况下 都会设置成一个 Object 类型的 , 不需要我们去设计一个,为了体现设计模式的角色,我们就设计了一个

@ToString
@Setter
@Getter
public class EventSource {
    private String name;
    private String info;
}
复制代码

2. 事件对象 - EventObject

这里我们继承了 EventObject , 只是简单的实现了一下 , 并没有做过多的包装

public class CoreEventObject extends EventObject {
    public CoreEventObject(EventSource source) {
        super(source);
    }
}
复制代码

3. 事件监听者 - EventListener

这里我们真正的监听者 , 一般情况下都需要设计成一个 函数式接口 , 我这个是和Spring框架学习的 , 因为函数式接口才能体现回调 ,

@FunctionalInterface
public interface CoreEventListener<E extends CoreEventObject> extends EventListener {

    void onEventObject(E event);
}
复制代码

4. 事件发布者 - EventPublisher

事件发布者 ,因为没有发布的事件对象, 哪来的监听

public class EventPublisher<E extends CoreEventObject> {

    private CoreEventListener<E> listener;


    public EventPublisher(CoreEventListener<E> listener) {
        this.listener = listener;
    }

    
    public void publish(E object){
        System.out.println("发布事件 : " + object);
        // 传给 CoreEventListener
        listener.onEventObject(object);
    }
}
复制代码

5. 测试Demo

public class TestDemo {
    public static void main(String[] args) {
        // 1. 创建一个事件发布者
        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() {
            @Override
            public void onEventObject(CoreEventObject event) {
                System.out.println("接收到事件源 : " + event.getSource() + " , 当前线程 : " + Thread.currentThread().getName()); 
            }
        });

        // 2. 发布一个事件对象 
        publisher.publish(getCoreEventObject());
    }

    private static CoreEventObject getCoreEventObject(){
        ..... 此处省略 
        return eventObject;
    }
}
复制代码

输出结果 :

发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)]
接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 当前线程 : main
复制代码

我们发现我们成功的接收到了事件对象 和 事件源 , 这个就是钩子函数的魅力 . 其实你只是做了一个事件发布你无心观察其他的东西 , 只需要一个监听者就可以做到监听了 , 这样你的事件发布 和 监听 完全就解耦了 .其实底层就是一个地址引用 .

3. 回调函数存在的问题 ?

1. 我们的问题 ?

很多场景下,我们的发布事件和监听事件完全在两个线程中,那么我们如何拿到事件对象呢 ?

如果我们简单使用一下 , 会这么写 ?

public class TestEventListener implements CoreEventListener<CoreEventObject> {

    private CoreEventObject object;

    @Override
    public void onEventObject(CoreEventObject object) {
 		// 赋值给成员变量 
        this.object = object;
    }

    // 获取成员变量 
    public CoreEventObject getObject() {
        return object;
    }
}
复制代码

测试一下 :

public class TestDemo {

    public static void main(String[] args) {

        TestEventListener listener = new TestEventListener();
        CoreEventObject object = listener.getObject();
        // 先去拿 ,后去发布
        System.out.println(object.getSource());

        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
        publisher.publish(getCoreEventObject());
    }


    private static CoreEventObject getCoreEventObject(){
			.... 
        return eventObject;
    }

}

复制代码

输出结果

Exception in thread "main" java.lang.NullPointerException
	at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28)
复制代码

有些人就会说 , 你这不对哇 ,你当然拿不到了 ,因为人家还没发布了 ,但是在多线程 ,在解耦的情况下 ,你哪知道对面何时发布结束了 , 你再去拿呢 ? 那就需要java的多线程知识了 ,Future 给我们带来了提醒 , 就是阻塞的思想 , 只有监听者真正的收到对象 , 我们才能去拿 .

2. 解决问题

了解过我前面提到的那一节 FutureTask 是如何实现的 ,我觉得问题就迎刃而解了 .

public class TestEventListener implements CoreEventListener<CoreEventObject> {
    private CoreEventObject object;

    /**
     * 当 X = 0 ,代表 obj还没有初始化了
     * 当 x = 1 , 代表 obj 以及初始化了 , 已经接收到了
     */
    private static volatile int x = 0;

    @Override
    public void onEventObject(CoreEventObject object) {
        this.object = object;
        // 收到改成 1
        x = 1;
    }

    public CoreEventObject getObject() {
        while (true) {
            if (x == 1) {
                break;
            }
        }
        // 拿到对象,再设置为1
        x = 0;
        return object;
    }
}
复制代码

由于这个解决方案,会使得执行 getObject() 的线程一直的阻塞下去,就是死循环下去,我们必须一个线程去执行这个方法 ,

public class TestDemo {
    public static void main(String[] args) {
        TestEventListener listener = new TestEventListener();
        // 新建一个线程去接收
        Thread thread = new Thread(() -> {
            System.out.println("我开始接收对象 : " + System.currentTimeMillis());
            CoreEventObject object = listener.getObject();
            System.out.println("成功接收对象 : "+object.getSource());
        });
        thread.start();
		// 新建一个线程去发布
        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
        new Thread(()->{
            publisher.publish(getCoreEventObject());
        }).start();
    }

    private static CoreEventObject getCoreEventObject(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        EventSource source = new EventSource();
        source.setName("事件源");
        source.setInfo("" + System.currentTimeMillis());
        return new CoreEventObject(source);
    }
}
复制代码

输出结果 :

我开始接收对象 : 1573282924555
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)]
成功接收对象 : EventSource(name=事件源, info=1573282925590)
复制代码

我们我们接收的时候是在1573282924555的时间搓 , 而真正拿到的对象确实在1573282925590发布的 , 这个就完全在俩时间轴上,所以我们成功的解决了问题 .

4. Spring 中的 ApplicationListener

1. ApplicationEvent

​ Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.

​ 此类被所有的 application events 所继承 。抽象的原因是因为直接发布这个 ApplicationEvent 是没有意义的。

2. ApplicationListener

​ Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.

​ 这个接口被所有的 application event listeners. 所实现 , 基于Java的 java.util.EventListener 接口规范

3. 开始使用

我们有一个需求就是 ,我们有一个服务会从远程不断的去拉去配置信息 ,一旦有改变就会发布配置信息 .

1. Config - 事件源

@ToString
@Setter
@Getter
public class Config {
    private String namespace;
    private Map<String, Object> info;
}
复制代码

2. ConfigEvent - 事件对象

// 这个注解,我们是根据Spring源码看到的 , 所以一致性,我就加了
@SuppressWarnings("serial")
public class ConfigEvent extends ApplicationEvent {

    public ConfigEvent(Config source) {
        super(source);
    }
}
复制代码

3. ConfigEventListener - 事件监听者

@Component
public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
    }
	// 保证执行顺序 , 多个 ConfigEventListener就需要实现这个接口
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
	
    // 初始化以后要做什么 ? 
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化当前ConfigEventListener");
    }
}
复制代码

5 . ConfigServer - 配置中心服务

@Service
public class ConfigServer {
	
    // 注入applicationContext,因为只有他才可以执行发布事件
    @Autowired
    private ApplicationContext applicationContext;

	// 这个是开启异步 ,后面会说到
    // @Async
    public void publishConfig(){
        // 需要发布 --- > 改变的事件
         System.out.println("发布事件成功 , 当前线程 : "+Thread.currentThread().getName());
        applicationContext.publishEvent(getChange());
    }
    
    
    public ConfigEvent getChange(){
        Config config = new Config();
        config.setNamespace("application");
        HashMap<String, Object> conf = new HashMap<>();
        conf.put("server.port", 8088);
        config.setInfo(conf);
        return  new ConfigEvent(config);
    }
}
复制代码

6. 启动测试

@SpringBootApplication
public class SpringListenerApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(SpringListenerApplication.class, args);
	}

	@Autowired
	private ConfigServer server;

	@Override
	public void run(String... args) throws Exception {
		server.publishConfig();
	}
}
复制代码

输出结果 :

.....
初始化当前ConfigEventListener
.... 
发布事件成功 , 当前线程 : main
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : main
复制代码

所以一个 Spring-Boot 的事件监听还是很简单的 ,类比到 Spring 一个道理,相信懂得人都知道 . 但是又一个问题是我们的 发布和监听都是 main线程 ,不好吧 ,玩意有很多事件了 ?

4. 开启异步发布

需要两个注解 @EnableAsync 启动Async功能 , 和 @Async 某个方法使用异步执行

发布事件成功 , 当前线程 : SimpleAsyncTaskExecutor-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : SimpleAsyncTaskExecutor-1
复制代码

我们发现就出现了线程池执行 , 这个理的线程池 ,是可以进行配置的 , 只需要我们显式的注入下面这个 SimpleAsyncTaskExecutor Bean 就可以了

@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    // 需要传入一个 ThreadFactory实现类 , 所以看过我前面写的文章应该会写这个,比如 JUC- Executor那节
    executor.setThreadFactory(new MyThreadFactory("anthony"));
    return executor;
}
复制代码

输出结果 :

发布事件成功 , 当前线程 : anthony-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
复制代码

5 . 多个监听器顺序执行

你可以跟我一样选择实现 ApplicationListenerOrdered ,或者你可以直接实现 SmartApplicationListener 都一样的哈,没有哪个好哪个不好

监听器 一 :

@Component
public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化当前监听器 : " + this.toString());
    }
}
复制代码

监听器 二 :

@Component
public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("ConfigEventListenerEnd  接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE-1;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化当前监听器 : " + this.toString());
    }
}
复制代码

输出结果 :

初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289
..... 
发布事件成功 , 当前线程 : anthony-1
ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
ConfigEventListenerEnd  接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1    
复制代码

5. Guava 中的 EventBus

​ Guava的EventBus 就是一个很好的事件注册发布的管理工具 , 他属于一种推送的模式 , 跟spring的很相似,

1. 依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.0-jre</version>
</dependency>
复制代码

2. 快速开始

​ 主要的对象就是 , EventBus 事件总线, 他是管理所有监听者(或者叫做订阅者) ,通过 EventBus#register 或者 EventBus#unRegister 来管理的 , 同时监听者要有监听的事件 , 这里是基于方法级别的 (注意方法只能有一个参数,就是监听的事件), 需要在方法上加上 @Subscribe 注解来表示监听 , EventBus 可以通过 EventBus#post 方法来发布事件 , 对应类型的监听者就会收到 . 同时 EventBus 可以处理异常

public class QuicklyStart {

    public static void main(String[] args) {
        // 创建一个 事件总线
        EventBus bus = new EventBus(new SubscriberExceptionHandler() {
            @Override
            public void handleException(Throwable exception, SubscriberExceptionContext context) {
                // 处理订阅者异常信息
                System.out.println("异常信息 : "+exception.getMessage() + ", 异常事件 : " + context.getEvent());
            }
        });

        // 注册你的监听器 , 其实更加准确来说是订阅者 , 他属于一种发布订阅模式
        bus.register(new EventListener());

        // 事件总线发布事件
        bus.post("sb");

        // 事件总线发布事件
        bus.post(new Event("hello Guava"));

    }


}

/**
 * 事件源
 */
class Event {

    String msg;

    public Event(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "Event{" + "msg='" + msg + '/'' + '}';
    }
}

/**
 * 监听器
 */
class EventListener {

    /**
     * {@link Subscribe} 一个这个代表一个订阅者,EventBus会将符合的事件发布到对应的订阅者上 , 但是不支持java的基本数据类型, int 之类的
     *
     * @param event
     */
    @Subscribe
    public void onEvent(Event event) {
        System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
    }


    @Subscribe
    public void onStringEvent(String event) {
        error(); // 模拟异常
        System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
    }

    private void error() {
        int i = 1 / 0;
    }
}
复制代码

输出 :

异常信息 : / by zero, 异常事件 : sb
当前线程 : main, 接收到事件 : Event{msg='hello Guava'}
复制代码

3. 基本原理

其实很简单 , 第一注册的时候 :

/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
  // 其实就是注解扫描 , 然后把元信息都给整出来
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
  Class<?> eventType = entry.getKey();
  Collection<Subscriber> eventMethodsInListener = entry.getValue();
    // 查找有没有 
  CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

    // 没有创建一个对象
  if (eventSubscribers == null) {
    CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
    eventSubscribers =
        MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
  }

    // 添加进去 ,其实是 Subscriber对象 , 把method信息, 对象信息全部封装进去了
  eventSubscribers.addAll(eventMethodsInListener);
}
}
复制代码

第二就是 发布

public void post(Object event) {
      // 根据事件获取对应的 Subscriber
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 发布出去
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }



@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
    // 获取当前线程的队列 ,用ThreadLocal维护的线程安全, 其实是为了安全
  Queue<Event> queueForThread = queue.get();
    // 创建一个事件对象
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      while ((nextEvent = queueForThread.poll()) != null) {
        while (nextEvent.subscribers.hasNext()) {
            // 处理
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
  }
}


 /** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
                // 关键点 ,就是这 
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
                // 出现异常直接就 ... 调用异常回调方法了
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }



  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
        // 原来如此 , .........method.invok 真.... 所以可以抓取异常
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }
复制代码

所以这个玩意很简单 , 原理一看就分析出来了

原文  https://juejin.im/post/5e8573fd518825738943a992
正文到此结束
Loading...