记一次RocketMQ服务启动时 NullPointerException问题
背景
利用 RcoketMQ @ExtRocketMQTemplateConfiguration 注解配置拓展 RocketMQTemplate 作为发消息的temple,RocketMQConsumer 和 ExtRokcetMQTemplate 在同一个项目里。相关RocketMQ版本信息如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
问题
服务启动时, 报错
java.lang.NullPointerException
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:475)
at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:977)
at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:60)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at com.lijun.samples.rocketmq.test2.Producer.sendMessage(Producer.java:22)
at com.lijun.samples.rocketmq.test2.Task.init(Task.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:338)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332)
at com.lijun.samples.rocketmq.Test2Application.main(Test2Application.java:13)
问题分析排查
相关代码:
@Component
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmqServer}", group = "testExtProducer")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
@Service
@RocketMQMessageListener(nameServer = "${rocketmqServer}",
topic = "test_send_topic", consumerGroup = "test_group_c")
public class Consumer implements RocketMQListener<String> {
@Resource
private ExtRocketMQTemplate rocketMQTemplate;
@Override
public void onMessage(String massage) {
System.out.println("consumer msg=" + message);
// 这里只是测试复现代码 真实实现不是这样 这样会导致一直发送和消费消息 自己实现时可以调整下
rocketMQTemplate.send("test_send_topic", new GenericMessage<>(message));
}
}
为什么出现: 服务启动时 RocketMQ Consumer 消费到消息后 处理完后会发送一条MQ消息出去, 我们使用的是 ExtRocketMQTemplateConfiguration 注册的拓展RocketMQTemplate 来发送消息
源码上来看 ListenerContainerConfiguration 负责创建RocketMQ Listener 也就是注册消费者. ExtProducerResetConfiguration 负责将我们配置的拓展nameServer、group实例化成一个producer ,设置进我们的拓展 ExtRocketMQTemplate
SmartInitializingSingleton.afterSingletonsInstantiated 调用时机为 所有单例bean 实例化完成后调用
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { // 注意这 直接启动了consumer container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } } @Configuration public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerTemplate); } private void registerTemplate(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName()); } ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; validate(annotation, genericApplicationContext); DefaultMQProducer mqProducer = createProducer(annotation); // Set instanceName same as the beanName mqProducer.setInstanceName(beanName); try { mqProducer.start(); } catch (MQClientException e) { throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", beanName), e); } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; // 注意这 这里才设置 producer rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); log.info("Set real producer to :{} {}", beanName, annotation.value()); } private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) { DefaultMQProducer producer ; // 省略过程 return producer; } }从启动日志上来看 ListenerContainerConfiguration 先于 ExtProducerResetConfiguration 执行完成,也就是消费者先注册成功再设置producer 在这期间 如果消费者消费到消息 马上发消息 则producer 为空
解决问题
从上面分析来看,其实也就是consumer 先于producer 启动造成的,所以得想办法调整 两者的顺序
解决方案1:
分析 SmartInitializingSingleton.afterSingletonsInstantiated 调用地方源码可以发现 是根据beanDefinition 顺序来执行的, 所以我们可以尝试修改两者顺序, 第一种是改源码(我们暂时不考虑), 第二种可以尝试在springboot启动类上添加 @Import({ExtProducerResetConfiguration.class, ListenerContainerConfiguration.class}) 来解决
public class DefaultListableBeanFactory extends AbstractAutowireCapableBeanFactory
implements ConfigurableListableBeanFactory, BeanDefinitionRegistry, Serializable {
List<String> beanNames = new ArrayList<>(this.beanDefinitionNames);
// 此处省略代码
for (String beanName : beanNames) {
Object singletonInstance = getSingleton(beanName);
if (singletonInstance instanceof SmartInitializingSingleton) {
StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize")
.tag("beanName", beanName);
SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
if (System.getSecurityManager() != null) {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
smartSingleton.afterSingletonsInstantiated();
return null;
}, getAccessControlContext());
}
else {
smartSingleton.afterSingletonsInstantiated();
}
smartInitialize.end();
}
}
}
解决方案2:
我们可以延迟 consumer 的启动来解决, 先介绍两个类 Lifecycle SmartLifecycle; 我们可以利用AOP将DefaultRocketMQListenerContainer的start方法延迟至容器启动时才调用
public interface Lifecycle {
void start(); //容器启动后调用
void stop(); //容器关闭前调用
boolean isRunning(); //当前应用是否正在运行
}
public interface SmartLifecycle extends Lifecycle, Phased {
int DEFAULT_PHASE = 2147483647;
default boolean isAutoStartup() { //自动调用start()、stop()方法,默认自动调用
return true;
}
default void stop(Runnable callback) {
this.stop(); //调用stop()方法
callback.run(); //如果不调用该方法,等待30s关闭容器;如果调用了该方法,不需要等待就可关闭容器
}
default int getPhase() { //如果有多个继承了SmartLifeCycle接口的类,返回值小的start()方法先调用,stop()方法相反
return 2147483647;
}
}
具体实现方式:
* Class Name is RocketMqListenerAspect
*/
@Component
@Aspect
public class RocketMqListenerAspect implements SmartLifecycle {
private boolean running = false;
@Pointcut("execution(* org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start())")
private void pointcut() {
}
private final List<ProceedingJoinPoint> joinPoints = new ArrayList<>(); // 方案1 持有joinPoint对象
private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>(); // 方案2 持有container对象
@Around(value = "pointcut()")
public Object aroundStart(ProceedingJoinPoint pjp) throws Throwable {
if (!isRunning()) {
// 如果已经执行过了 就不需要再执行了
joinPoints.add(pjp); // 1
containers.add((DefaultRocketMQListenerContainer) pjp.getThis());//2
return null;
}
return pjp.proceed();
}
@Override
public void stop() {
// 2 stop 可以不用实现 服务关闭时 会自动 调用 DefaultRocketMQListenerContainer.destroy 方案关闭
// if (isRunning()) {
// System.err.println("stop rocketMq listener!");
// this.running = false;
// containers.forEach(DefaultRocketMQListenerContainer::stop);
// }
}
@Override
public boolean isRunning() {
return running;
}
@Override
public void start() {
if (!isRunning()) {
System.err.println("start");
this.running = true;
// 1
for (DefaultRocketMQListenerContainer listenerContainer : containers) {
System.err.println("real start " + listenerContainer);
listenerContainer.start();
}
// 2
for (ProceedingJoinPoint proceedingJoinPoint : joinPoints) {
try {
System.err.println("real start " + proceedingJoinPoint.getThis());
proceedingJoinPoint.proceed();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
@Override
public int getPhase() {
return Integer.MIN_VALUE;
}
}
这里可以持有 DefaultRocketMQListenerContainer对象 也可以持有 ProceedingJoinPoint 对象 2选一
注意
本文的解决办法 仅针对消费了消息立马发消息的场景, 如果是bean初始化完后立马发消息依旧会出现NPE 异常,示例代码:
public class Task {
@Resource
private ExtRocketMQTemplate template;
@PostConstruct
public void init() {
rocketMQTemplate.send(topic, new GenericMessage<>("hello"));// 这里依然可能出现NPE
}
}
归根结底,还是ExtRocketMQTemplate中producer初始化晚了。
如果我们将 producer 初始化提前到 BeanPostProcessor 处理的过程中,就不会出现这个问题了
public class ExtProducerResetConfiguration implements ApplicationContextAware, BeanPostProcessor { // 1. 实现BeanPostProcessor
private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
private ConfigurableApplicationContext applicationContext;
private StandardEnvironment environment;
private RocketMQProperties rocketMQProperties;
private RocketMQMessageConverter rocketMQMessageConverter;
public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {// 2.
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (clazz.isAnnotationPresent(ExtRocketMQTemplateConfiguration.class)
&& !ScopedProxyUtils.isScopedTarget(beanName)) {
this.registerTemplate(beanName, bean);
}
return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName);
}
private void registerTemplate(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
// Set instanceName same as the beanName
mqProducer.setInstanceName(beanName);
// rocketmqTempalte afterProperties 会启动
// try { 3. 不需要调用start了
// mqProducer.start();
// } catch (MQClientException e) {
// throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
// beanName), e);
// }
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real producer to :{} {}", beanName, annotation.value());
}
private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
if (producerConfig == null) {
producerConfig = new RocketMQProperties.Producer();
}
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
String groupName = environment.resolvePlaceholders(annotation.group());
groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;
String ak = environment.resolvePlaceholders(annotation.accessKey());
ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : ak;
String sk = environment.resolvePlaceholders(annotation.secretKey());
sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : sk;
boolean isEnableMsgTrace = annotation.enableMsgTrace();
String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
//if String is not is equal "true" TLS mode will represent the as default value false
boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable()));
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
producer.setUseTLS(useTLS);
producer.setNamespace(annotation.namespace());
return producer;
}
private void validate(ExtRocketMQTemplateConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
annotation.value()));
}
}
}