一、介绍
1、观察者模式
拥有一些值得关注的状态的对象通常被称为目标, 由于它要将自身的状态改变通知给其他对象, 我们也将其称为发布者 (publisher)。 所有希望关注发布者状态变化的其他对象被称为订阅者 (subscribers)。
观察者模式其实就是订阅者订阅自己关心的主题和主题有数据变化后通知订阅者。
指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。
2、角色介绍
在观察者模式中,有以下几个角色。
- 主题也叫被观察者(Subject):
- 定义被观察者必须实现的职责,
- 它能动态的增加取消观察者,它一般是抽象类或者是实现类,仅仅完成作为被观察者必须实现的职责:
- 管理观察者并通知观察者。
- 观察者(Observer):
观察者接受到消息后,即进行更新操作,对接收到的信息进行处理。
- 具体的被观察者(ConcreteSubject):
定义被观察者自己的业务逻辑,同时定义对哪些事件进行通知。
- 具体的观察者(ConcreteObserver):
具体的观察者,每个观察者接收到消息后的处理反应是不同的,每个观察者都有自己的处理逻辑。
3、适用场景
- 对象间存在一对多关系,一个对象的状态发生改变会影响其他对象。
- 当一个抽象模型有两个方面,其中一个方面依赖于另一方面时,可将这二者封装在独立的对象中以使它们可以各自独立地改变和复用。
- 实现类似广播机制的功能,不需要知道具体收听者,只需分发广播,系统中感兴趣的对象会自动接收该广播。
- 多层级嵌套使用,形成一种链式触发机制,使得事件具备跨域(跨越两种观察者类型)通知。
二、代码
首先,需要定义主题,每个主题需要持有观察者列表的引用,用于在数据变更的时候通知各个观察者:
public class Subject {
private List<Observer> observers = new ArrayList<Observer>();
private int state;
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
// 数据已变更,通知观察者们
notifyAllObservers();
}
// 注册观察者
public void attach(Observer observer) {
observers.add(observer);
}
// 通知观察者们
public void notifyAllObservers() {
for (Observer observer : observers) {
observer.update();
}
}
}
定义观察者接口:
public abstract class Observer {
protected Subject subject;
public abstract void update();
}
其实如果只有一个观察者类的话,接口都不用定义了,不过,通常场景下,既然用到了观察者模式,我们就是希望一个事件出来了,会有多个不同的类需要处理相应的信息。比如,订单修改成功事件,我们希望发短信的类得到通知、发邮件的类得到通知、处理物流信息的类得到通知等。
我们来定义具体的几个观察者类:
public class BinaryObserver extends Observer {
// 在构造方法中进行订阅主题
public BinaryObserver(Subject subject) {
this.subject = subject;
// 通常在构造方法中将 this 发布出去的操作一定要小心
this.subject.attach(this);
}
// 该方法由主题类在数据变更的时候进行调用
@Override
public void update() {
String result = Integer.toBinaryString(subject.getState());
System.out.println("订阅的数据发生变化,新的数据处理为二进制值为:" + result);
}
}
public class HexaObserver extends Observer {
public HexaObserver(Subject subject) {
this.subject = subject;
this.subject.attach(this);
}
@Override
public void update() {
String result = Integer.toHexString(subject.getState()).toUpperCase();
System.out.println("订阅的数据发生变化,新的数据处理为十六进制值为:" + result);
}
}
客户端使用也非常简单:
public static void main(String[] args) {
// 先定义一个主题
Subject subject1 = new Subject();
// 定义观察者
new BinaryObserver(subject1);
new HexaObserver(subject1);
// 模拟数据变更,这个时候,观察者们的 update 方法将会被调用
subject.setState(11);
}
当然,jdk 也提供了相似的支持,具体的大家可以参考 java.util.Observable
和 java.util.Observer
这两个类。
实际生产过程中,观察者模式往往用消息中间件来实现,如果要实现单机观察者模式,建议使用 Guava 中的 EventBus,它有同步实现也有异步实现,本文主要介绍设计模式,就不展开说了。
还有,即使是上面的这个代码,也会有很多变种,大家只要记住核心的部分,那就是一定有一个地方存放了所有的观察者,然后在事件发生的时候,遍历观察者,调用它们的回调函数。
三、Spring中的应用
1、spring的事件发布机制
Spring事件机制是观察者模式的实现。ApplicationContext
中事件处理是由 ApplicationEvent
类和ApplicationListener
接口来提供的。如果一个 Bean
实现了 ApplicationListener
接口,并且已经发布到容器中去,每次 ApplicationContext
发布一个 ApplicationEvent
事件,这个Bean就会接到通知。
ApplicationEvent
事件的发布需要显示触发,要么 Spring
触发,要么我们编码触发。spring内置事件由spring 触发。我们先来看一下,如何自定义spring事件,并使其被监听和发布。
(1)事件
事件,ApplicationEvent
,该抽象类继承了EventObject
,EventObject
是JDK中的类,并建议所有的事件都应该继承自 EventObject
。
public class EventObject implements java.io.Serializable {
private static final long serialVersionUID = 5516075349620653480L;
/**
* 事件发布者
*/
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
public Object getSource() {
return source;
}
public String toString() {
return getClass().getName() + "[source=" + source + "]";
}
}
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp = System.currentTimeMillis();
public ApplicationEvent(Object source) {
super(source);
}
public final long getTimestamp() {
return this.timestamp;
}
}
(2)监听器
ApplicationListener
,是一个接口,该接口继承了EventListener
接口。EventListener
接口是JDK中的,建议所有的事件监听器都应该继承 EventListener
。
public interface EventListener {
}
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}
(3)事件发布器
ApplicationEventPublisher
,ApplicationContext
继承了该接口,在 ApplicationContext
的抽象实现类 AbstractApplicationContext
中做了实现下面我们来看一下
/**
* 封装事件发布功能的接口
*/
@FunctionalInterface
public interface ApplicationEventPublisher {
/**
* 发布事件
*/
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
/**
* 发布事件
*/
void publishEvent(Object event);
}
在 AbstractApplicationContext
实现类中的具体实现
@Nullable
private Set<ApplicationEvent> earlyApplicationEvents;
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
可以看到,AbstractApplicationContext
中 publishEvent
方法最终执行发布事件的是ApplicationEventMulticaster#multicastEvent
方法
/**
* 将给定的应用程序事件多播到适当的侦听器。
*/
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
(4)总结
- 定义一个事件,该事件继承ApplicationEvent
- 定义一个监听器,实现ApplicationListener接口
- 定义一个事件发布器,实现ApplicationEventPublisherAware接口
- 调用ApplicationEventPublisher#publishEvent方法.
2、Spring事件监听
spring的容器初始化过程中,refresh方法中,有这样两个方法,initApplicationEventMulticaster()
和 registerListeners()
(1)initApplicationEventMulticaster()
AbstractApplicationContext
初始化ApplicationEventMulticaster。如果上下文中没有定义,则使用SimpleApplicationEventMulticaster。
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//BeanFactory中是否有ApplicationEventMulticaster
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
//如果BeanFactory中不存在,就创建一个SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
(2)registerListeners()
registerListeners 是将各种实现了 ApplicationListener 的监听器注册到 ApplicationEventMulticaster 事件广播器中
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
//把监听器注册到事件发布器上
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
//如果内置监听事件集合不为空
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
//执行spring内置的监听方法
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
earlyApplicationListeners的本质还是ApplicationListener。Spring单例Ban的实例化是在Refresh阶段实例化的,那么用户自定义的一些ApplicationListener组件自然也是在这个阶段才初始化,但是Spring容器启动过程中,在Refresh完成之前还有很多事件:如Spring上下文环境准备等事件,这些事件又是Spring容器启动必须要监听的。所以Spring定义了一个earlyApplicationListeners集合,这个集合中的Listener在factories文件中定义好,在容器Refresh之前预先实例化好,然后就可以监听Spring容器启动过程中的所有事件。
当registerListeners方法执行完成,我们的监听器已经添加到多播器 SimpleApplicationEventMulticaster
中了,并且earlyEvent 早期事件也已经执行完毕。但是我们发现,如果自定义了一个监听器去监听spring内置的事件,此时并没有被执行,那我们注册的监听器是如何被执行的呢?答案在finishRefresh方法中。
(3)finishRefresh()
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
//容器中的类全部初始化完毕后,触发刷新事件
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
如果我们想要实现在spring容器中所有bean创建完成后做一些扩展功能,我们就可以实现ApplicationListener这样我们就可以实现其功能了。
这就是Spring 同步事件监听
3、异步事件监听
继续从 finishRefresh()
中的 pushEvent
往下看
AbstractApplicationContext#pushEvent
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext {
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
//获取当前注入的发布器,执行发布方法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
}
最终执行发布事件的是SimpleApplicationEventMulticaster#multicastEvent方法
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
//拿到所有的监听器,如果异步执行器不为空,异步执行
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
//执行监听方法,同步执行
invokeListener(listener, event);
}
}
}
异步事件通知主要依靠SimpleApplicationEventMulticaster 类中的Executor去实现的,如果这个变量不配置的话默认事件通知是同步的, 否则就是异步通知了,要实现同时支持同步通知和异步通知就得从这里下手;
如果BeanFactory中已经有了SimpleApplicationEventMulticaster则不会重新创建,那么我们可以再spring中注册一个SimpleApplicationEventMulticaster并且向其中注入对应的Executor这样我们就可以得到一个异步执行监听的SimpleApplicationEventMulticaster了,我们的通知就会通过Executor异步执行。这样可以大大提高事件发布的效率。
在springboot项目中我们可以增加一个配置类来实现
@Configuration
@EnableAsync
public class Config {
@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(){
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}