18.spring系列- 事件机制

先看几个问题

  1. 为什么使用事件?
  2. Spring事件的实现有哪些方式?
  3. spring事件监听器的处理是同步还是异步?
  4. spring事件监听器支持自定义顺序吗?

为什么需要使用事件?

先来看一个业务场景:

产品经理:这两天你帮我实现一个注册的功能
我:注册功能比较简单,将用户信息入库就可以了,伪代码如下:

public void registerUser(UserModel user){
    //插入用户信息到db,完成注册
    this.insertUser(user);
}

过了几天,产品经理:注册成功之后,给用户发送一封注册成功的邮件
我:修改了一下上面注册代码,如下:

public void registerUser(UserModel user){
    //插入用户信息到db,完成注册
    this.insertUser(user);
    //发送邮件
    this.sendEmailToUser(user);
}

由于修改了注册接口,所以所有调用这个方法的地方都需要重新测试一遍
又过了几天,产品经理:注册成功之后,给用户发一下优惠券
我:好的,又调整了一下代码:

public void registerUser(UserModel user){
    //插入用户信息到db,完成注册
    this.insertUser(user);
    //发送邮件
    this.sendEmailToUser(user);
    //发送优惠券
    this.sendCouponToUser(user);
}

我:测试的兄弟们,辛苦一下大家,注册接口又修改了,帮忙再过一遍。
过了一段时间,公司效益太好,产品经理:注册的时候,取消给用户发送优惠券的功能。
我:又跑去调整了一下上面代码,将发送优惠券的功能干掉了,如下:

public void registerUser(UserModel user){
    //插入用户信息到db,完成注册
    this.insertUser(user);
    //发送邮件
    this.sendEmailToUser(user);
}

由于调整了代码,而注册功能又属于核心业务,所以需要让测试再次帮忙过一遍,又要麻烦测试来一遍了。
突然有一天,产品经理:注册接口怎么这么慢啊,并且还经常失败?
我:赶紧跑去查看了一下运行日志,发现注册的时候给用户发送邮件不稳定,依赖于第三方邮件服务器,耗时比较长,并且容易失败。
产品经理:邮件你可以不发,但是你得确保注册功能必须可以用啊。
我想了想,将上面代码改成了下面这样,发送邮件放在了子线程中执行:

public void registerUser(UserModel user){
    //插入用户信息到db,完成注册
    this.insertUser(user);
    //发送邮件,放在子线程中执行,邮件的发送结果对注册逻辑不会有干扰作用
    new Thread(()->{
        this.sendEmailToUser(user);
    }).start();
}

又过了几天,产品经理又跑来了说:最近效益不好,需要刺激用户消费,注册的时候继续发送优惠券。
我:。。。

花了点时间,好好复盘整理了一下:发现问题不在于产品经理,从业务上来看,产品提的这些需求都是需求合理的,而结果代码反复调整、测试反复测试,以及一些次要的功能导致注册接口不稳定,这些问题归根到底,主要还是我的设计不合理导致的,将注册功能中的一些次要的功能耦合到注册的方法中了,并且这些功能可能会经常调整,导致了注册接口的不稳定性。

其实上面代码可以这么做:

找3个人:注册器、路人A、路人B。

注册器:负责将用户信息落库,落库成功之后,喊一声:用户XXX注册成功了。

路人A和路人B,竖起耳朵,当听到有人喊:XXX注册成功 的声音之后,立即行动做出下面反应:

路人A:负责给XXX发送一封注册邮件

路人B:负责给XXX发送优惠券

我们来看一下:

注册器只负责将用户信息落库,及广播一条用户注册成功的消息。

A和B相当于一个监听者,只负责监听用户注册成功的消息,当听到有这个消息产生的时候,A和B就去做自己的事情。

这里面注册器是感知不到A/B存在的,A和B也不用感知注册器的存在,A/B只用关注是否有人广播:XXX注册成功了的消息,当AB听到有人广播注册成功的消息,他们才做出反应,其他时间闲着休息。

这种方式就非常好:

当不想给用户发送优惠券的时候,只需要将B去掉就行了,此时基本上也不用测试,注册一下B的代码就行了。

若注册成功之后需要更多业务,比如还需要给用户增加积分,只需新增一个监听者C,监听到注册成功消息后,负责给用户添加积分,此时根本不用去调整注册的代码,开发者和测试人员只需要确保监听者C中的正确性就可以了。

上面这种模式就是事件模式。

下面我们使用事件模式实现用户注册的业务

事件对象 :表示所有事件的父类,内部有个source字段,表示事件源;我们自定义的事件需要继承这个类。

/**
 * 事件对象
 */
public abstract class AbstractEvent {

    //事件源
    protected Object source;

    public AbstractEvent(Object source) {
        this.source = source;
    }

    public Object getSource() {
        return source;
    }

    public void setSource(Object source) {
        this.source = source;
    }
}

事件监听器

/**
 * 事件监听器
 *
 * @param <E> 当前监听器感兴趣的事件类型
 */
public interface EventListener<E extends AbstractEvent> {
    /**
     * 此方法负责处理事件
     *
     * @param event 要响应的事件对象
     */
    void onEvent(E event);
}

事件广播器

/**
 * 事件广播器:
 * 1.负责事件监听器的管理(注册监听器&移除监听器,将事件和监听器关联起来)
 * 2.负责事件的广播(将事件广播给所有的监听器,对该事件感兴趣的监听器会处理该事件)
 */
public interface EventMulticaster {

    /**
     * 广播事件给所有的监听器,对该事件感兴趣的监听器会处理该事件
     *
     * @param event
     */
    void multicastEvent(AbstractEvent event);

    /**
     * 添加一个事件监听器(监听器中包含了监听器中能够处理的事件)
     *
     * @param listener 需要添加监听器
     */
    void addEventListener(EventListener<?> listener);


    /**
     * 将事件监听器移除
     *
     * @param listener 需要移除的监听器
     */
    void removeEventListener(EventListener<?> listener);
}

事件广播默认实现

/**
 * 事件广播器简单实现
 */
public class SimpleEventMulticaster implements EventMulticaster {

    private Map<Class<?>, List<EventListener>> eventObjectEventListenerMap = new ConcurrentHashMap<>();

    @Override
    public void multicastEvent(AbstractEvent event) {
        List<EventListener> eventListeners = this.eventObjectEventListenerMap.get(event.getClass());
        if (eventListeners != null) {
            for (EventListener eventListener : eventListeners) {
                eventListener.onEvent(event);
            }
        }
    }

    @Override
    public void addEventListener(EventListener<?> listener) {
        Class<?> eventType = this.getEventType(listener);
        List<EventListener> eventListeners = this.eventObjectEventListenerMap.get(eventType);
        if (eventListeners == null) {
            eventListeners = new ArrayList<>();
            this.eventObjectEventListenerMap.put(eventType, eventListeners);
        }
        eventListeners.add(listener);
    }

    @Override
    public void removeEventListener(EventListener<?> listener) {
        Class<?> eventType = this.getEventType(listener);
        List<EventListener> eventListeners = this.eventObjectEventListenerMap.get(eventType);
        if (eventListeners != null) {
            eventListeners.remove(listener);
        }
    }

    /**
     * 获取事件监听器需要监听的事件类型
     *
     * @param listener
     * @return
     */
    protected Class<?> getEventType(EventListener listener) {
        ParameterizedType parameterizedType = (ParameterizedType) listener.getClass().getGenericInterfaces()[0];
        Type eventType = parameterizedType.getActualTypeArguments()[0];
        return (Class<?>) eventType;
    }

}

上面3个类支撑了整个时间模型,下面我们使用上面三个类来实现注册的功能,目标是:高内聚低耦合,让注册逻辑方便扩展。

自定义用户注册成功事件类

/**
 * 用户注册成功事件
 */
public class UserRegisterSuccessEvent extends AbstractEvent {
    //用户名
    private String userName;

    /**
     * 创建用户注册成功事件对象
     *
     * @param source   事件源
     * @param userName 当前注册的用户名
     */
    public UserRegisterSuccessEvent(Object source, String userName) {
        super(source);
        this.userName = userName;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}

用户注册服务

/**
 * 用户注册服务
 */
public class UserRegisterService {
    //事件发布者
    private EventMulticaster eventMulticaster; //@0

    /**
     * 注册用户
     *
     * @param userName 用户名
     */
    public void registerUser(String userName) {
        //用户注册(将用户信息入库等操作)
        System.out.println(String.format("用户【%s】注册成功", userName));
        //广播事件
        this.eventMulticaster.multicastEvent(new UserRegisterSuccessEvent(this, userName));
    }

    public EventMulticaster getEventMulticaster() {
        return eventMulticaster;
    }

    public void setEventMulticaster(EventMulticaster eventMulticaster) {
        this.eventMulticaster = eventMulticaster;
    }
}

下面我们使用spring来将上面的对象组装起来

@Configuration
@ComponentScan
public class MainConfig0 {

    /**
     * 注册一个bean:事件发布者
     *
     * @param eventListeners
     * @return
     */
    @Bean
    @Autowired(required = false)
    public EventMulticaster eventMulticaster(List<EventListener> eventListeners) {
        EventMulticaster eventPublisher = new SimpleEventMulticaster();
        if (eventListeners != null) {
            eventListeners.forEach(eventPublisher::addEventListener);
        }
        return eventPublisher;
    }

    /**
     * 注册一个bean:用户注册服务
     *
     * @param eventMulticaster
     * @return
     */
    @Bean
    public UserRegisterService userRegisterService(EventMulticaster eventMulticaster) {
        UserRegisterService userRegisterService = new UserRegisterService();
        userRegisterService.setEventMulticaster(eventMulticaster);
        return userRegisterService;
    }
}

来个测试用例模拟用户注册

@Test
    public void test0() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MainConfig0.class);
        //获取用户注册服务
        com.javacode2018.lesson003.demo1.test0.userregister.UserRegisterService userRegisterService =
                context.getBean(com.javacode2018.lesson003.demo1.test0.userregister.UserRegisterService.class);
        //模拟用户注册
        userRegisterService.registerUser("spring");
    }

运行输出:

用户【spring】注册成功

添加注册成功发送邮件功能


/**
 * 用户注册成功事件监听器->负责给用户发送邮件
 */
@Component
public class SendEmailOnUserRegisterSuccessListener implements EventListener<UserRegisterSuccessEvent> {
    @Override
    public void onEvent(UserRegisterSuccessEvent event) {
        System.out.println(
                String.format("给用户【%s】发送注册成功邮件!", event.getUserName()));
    }
}

运行结果:

用户【spring】注册成功
给用户【spring】发送注册成功邮件!

上面将注册的主要逻辑(用户信息落库)和次要的业务逻辑(发送邮件)通过事件的方式解耦了。次要的业务做成了可插拔的方式,比如不想发送邮件了,只需要将邮件监听器上面的@Component注释就可以了,非常方便扩展。

spring中实现事件模式

事件相关的几个类:

  1. ApplicationEvent:事件对象的父类
  2. ApplicationListener:事件监听器接口
  3. ApplicationEventMulticaster:事件广播器
  4. SimpleApplicationEventMulticaster:事件广播器的简单实现

硬编码的方式使用spring事件

步骤一:定义事件

自定义事件,需要继承ApplicationEvent类

步骤二:定义监听器

自定义器监听器,需要实现ApplicationListener接口,并实现onApplicationEvent这个方法,用来处理感兴趣的事件

步骤三:创建事件广播器

创建事件广播器ApplicationEventMulticaster,这个接口,可以实现这个接口,也可以实现系统给我们提供的SimpleApplicationEventMulticaster接口

步骤四:向广播器中注册事件监听器

将事件监听器注册到广播器中ApplicationEventMulticaster中

步骤五:通过广播器发布事件

广播事件,调用ApplicationEventMulticaster#multicastEvent方法广播事件,此时广播器中对这个事件感兴趣的监听器会处理这个事件。

我们来个案例:

//订单事件
public class OrderCreateEvent extends ApplicationEvent {

    //订单编号
    private Long orderNo;

    /**
     * Create a new {@code ApplicationEvent}.
     *
     * @param source the object on which the event initially occurred or with
     *               which the event is associated (never {@code null})
     */
    public OrderCreateEvent(Object source, Long orderNo) {
        super(source);
        this.orderNo = orderNo;
    }

    public Long getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(Long orderNo) {
        this.orderNo = orderNo;
    }
}
//监听器
@Component
public class SendEmailOnOrderCreateListener implements ApplicationListener<OrderCreateEvent> {
    @Override
    public void onApplicationEvent(OrderCreateEvent event) {
        System.out.println(String.format("订单【%d】创建成功,给下单人发送邮件通知!", event.getOrderNo()));
    }
}

测试方法:

    @Test
    public void testEvent1() throws InterruptedException {
        //创建事件广播器
        ApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
        //注册事件监听器
        applicationEventMulticaster.addApplicationListener(new SendEmailOnOrderCreateListener());
        //广播事件订单创建事件
        applicationEventMulticaster.multicastEvent(new OrderCreateEvent(applicationEventMulticaster, 1l));
    }

运行结果:

订单【1】创建成功,给下单人发送邮件通知!

Spring为了简化事件的使用,提供了2种使用方式

  1. 面向接口的方式
  2. 面向@EventListener注解的方式

我们先看第一种方式:接口的方式

定义用户注册事件:

public class UserRegisterEvent extends ApplicationEvent {

    private String userName;

    /**
     * Create a new {@code ApplicationEvent}.
     *
     * @param source the object on which the event initially occurred or with
     *               which the event is associated (never {@code null})
     */
    public UserRegisterEvent(Object source, String userName) {
        super(source);
        this.userName = userName;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}

定义事件监听器:

@Component
public class SendEmailListener implements ApplicationListener<UserRegisterEvent> {


    @Override
    public void onApplicationEvent(UserRegisterEvent event) {
        System.out.println(String.format("给用户【%s】发送注册成功邮件!", event.getUserName()));
    }
}

定义用户注册服务:

@Component
public class UserRegisterService implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher applicationEventPublisher;
	//如果我们想在普通的bean中获取ApplicationEventPublisher对象,需要实现ApplicationEventPublisherAware接口,利用ApplicationEventPublisher#publishEvent方法进行发布事件
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    /**
     * 负责用户注册及发布事件的功能
     *
     * @param userName 用户名
     */
    public void registerUser(String userName) {
        //用户注册(将用户信息入库等操作)
        System.out.println(String.format("用户【%s】注册成功", userName));
        //发布注册成功事件
        this.applicationEventPublisher.publishEvent(new UserRegisterEvent(this, userName));
    }

}

测试方法:

    @Test
    public void testEvent2() throws InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MainConfig6.class);
        com.spring.event2.UserRegisterService bean = context.getBean(com.spring.event2.UserRegisterService.class);
        bean.registerUser("spring event");
    }

运行结果:

用户【spring event】注册成功
给用户【spring event】发送注册成功邮件!

第二种方式:@EventListener

我们只修改上面案例中的监听器,改为注解形式:

@Component
public class SendEmailListener{

    @EventListener
    public void sendMail(UserRegisterEvent event) {
        System.out.println(String.format("给用户【%s】发送注册成功邮件!", event.getUserName()));
    }

    @EventListener
    public void sendCompon(UserRegisterEvent event) {
        System.out.println(String.format("给用户【%s】发送优惠券!", event.getUserName()));
    }
}

其他不变,运行结果:

用户【spring event】注册成功
给用户【spring event】发送注册成功邮件!
给用户【spring event】发送优惠券!

原理:
spring中处理@EventListener注解源码位于下面的方法中

org.springframework.context.event.EventListenerMethodProcessor#afterSingletonsInstantiated

EventListenerMethodProcessor实现了SmartInitializingSingleton接口,SmartInitializingSingleton接口中的afterSingletonsInstantiated方法会在所有单例的bean创建完成之后被spring容器调用。这块的内容可以去看一下:spring系列-Bean生命周期

监听器的排序功能

通过接口实现监听器的情况

如果自定义的监听器是通过ApplicationListener接口实现的,那么指定监听器的顺序有三种方式:

  1. 实现org.springframework.core.Ordered接口(需要实现一个getOrder方法,返回顺序值,值越小,顺序越高)
  2. 实现org.springframework.core.PriorityOrdered接口(PriorityOrdered接口继承了方式一中的Ordered接口,所以如果你实现PriorityOrdered接口,也需要实现getOrder方法。)
  3. 类上使用@org.springframework.core.annotation.Order注解

这几种方式排序规则:
PriorityOrdered#getOrder ASC,Ordered或@Order ASC

监听器异步模式

监听器最终是通过ApplicationEventMulticaster内部的实现来调用的,所以我们关注的重点就是这个类,这个类默认有个实现类SimpleApplicationEventMulticaster,这个类是支持监听器异步调用的,里面有个字段:

private Executor taskExecutor;

高并发比较熟悉的朋友对Executor这个接口是比较熟悉的,可以用来异步执行一些任务。

再来看一下SimpleApplicationEventMulticaster中事件监听器的调用,最终会执行下面这个方法:

@Override
	public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			if (executor != null) {
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
				invokeListener(listener, event);
			}
		}
	}

我们看一下容器启动的时候,初始化操作:

	// Initialize event multicaster for this context.
	initApplicationEventMulticaster();
/**
	 * Initialize the ApplicationEventMulticaster.
	 * Uses SimpleApplicationEventMulticaster if none defined in the context.
	 * @see org.springframework.context.event.SimpleApplicationEventMulticaster
	 */
	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}

大概分析:判断spring容器中是否有名称为applicationEventMulticaster的bean,如果有就将其作为事件广播器,否则创建一个SimpleApplicationEventMulticaster作为广播器,并将其注册到spring容器中。

从上面可以得出结论:我们只需要自定义一个类型为SimpleApplicationEventMulticaster名称为applicationEventMulticaster的bean就可以了,顺便给executor设置一个值,就可以实现监听器异步执行了。

具体实现:

@ComponentScan
@Configuration
public class MainConfig8 {

    @Bean
    public ApplicationEventMulticaster applicationEventMulticaster(){
        //创建一个事件广播器
        SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
        //给广播器提供一个线程池,通过这个线程池来调用事件监听器
        Executor taskExecutor = this.applicationEventMulticasterThreadPool().getObject();
        //设置异步执行器
        multicaster.setTaskExecutor(taskExecutor);
        return multicaster;
    }

    @Bean
    public ThreadPoolExecutorFactoryBean applicationEventMulticasterThreadPool() {
        ThreadPoolExecutorFactoryBean result = new ThreadPoolExecutorFactoryBean();
        result.setThreadNamePrefix("applicationEventMulticasterThreadPool-");
        result.setCorePoolSize(5);
        return result;
    }
}

其他不变,把打印语句上添加上线程名称,看一下是否是同一个线程,运行结果:

线程【main】负责用户【spring event】注册成功
线程【applicationEventMulticasterThreadPool-2】负责给用户【spring event】发送优惠券!
线程【applicationEventMulticasterThreadPool-1】负责给用户【spring event】发送注册成功邮件!

关于事件,我们就说到这里。接下来我们介绍bean的循环依赖。