JMS消息概述
JMS的全称是Java Message Service,即Java消息服务。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息
JMS支持两种发送和接收消息的模型
P2P模型
采用点到点的方式发送信息,P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中获取消息,P2P每个消息只有一个消费者,发送者和接受者在时间上没有依赖性
Pub/Sub模型
发布和订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为Topic(主题),主题可以认为是消息传递的中介,消息发布者将消息发布到某一个主题,消息订阅者则从主题中订阅消息
SpringBoot集成ActiveMQ
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
编写application.properties配置
###ActiveMQ配置
spring.activemq.broker-url=tcp:localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
spring.activemq.packages.trust-all=true
创建数据库表mood
create table mood(
id varchar(32) primary key not null,
content varchar(256) not null,
user_id varchar(32) not null,
praise_num int(11) not null,
publish_time datetime not null);
alter table mood add index mood_user_id_index(user_id);
根据数据库创建实体类
package com.xf.demo03.model;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;
/**
* @author xfgg
*/
@Data
@Entity
@Table(name="mood")
public class Mood implements Serializable {
@Id
private String id;
private String content;
private String userId;
private Integer praiseNum;
private Date publishTime;
}
创建jpa接口
package com.xf.demo03.repository;
import com.xf.demo03.model.Mood;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* @author xfgg
*/
public interface MoodRepository extends JpaRepository<Mood,String> {
}
创建service接口
package com.xf.demo03.service;
import com.xf.demo03.model.Mood;
/**
* @author xfgg
*/
public interface MoodService {
/**
* 保存Mood对象
* @param mood
* @return
*/
Mood save(Mood mood);
}
创建service实现服务
package com.xf.demo03.service.impl;
import com.xf.demo03.model.Mood;
import com.xf.demo03.repository.MoodRepository;
import com.xf.demo03.service.MoodService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author xfgg
*/
@Service
public class MoodServiceImpl implements MoodService {
@Autowired
private MoodRepository moodRepository;
@Override
public Mood save(Mood mood) {
return moodRepository.save(mood);
}
}
创建test方法
@Resource
private MoodService moodService;
@Test
public void testmood(){
Mood mood = new Mood();
mood.setId("1");
mood.setUserId("1");
mood.setPraiseNum(0);
mood.setContent("这是我的第一条说说");
mood.setPublishTime(new Date());
moodService.save(mood);
}
}
```
使用ActiveMQ作异步消费,减轻用户并发发表说说而产生的压力,提高系统的整体性能
创建生产者
```java
package com.xf.demo03.service.producer;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
/**
* @author xfgg
*/
@Service
public class MoodProducer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
/**
*JmsMessagingTemplate是发送消息的工具类,封装了JmsTemplate
* @param destination 发送到的队列
* @param message 待发送的消息
*/
public void sendMessage(Destination destination,final String message){
jmsMessagingTemplate.convertAndSend(destination,message);
}
}
```
创建消费者
```java
package com.xf.demo03.service.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* @author xfgg
*/
@Component
public class MoodConsumer {
/**
* JmsListener配置消费者监听的队列moodqueue
* @param text 接收到的信息
*/
@JmsListener(destination = "mood.queue")
public void receiveQueue(String text){
System.out.println("用户发表说说["+text+"]成功");
}
}
```
测试
```java
@Resource
private MoodProducer moodProducer;
@Test
public void testActiveMQ(){
Destination destination = new ActiveMQQueue("mood.queue");
moodProducer.sendMessage(destination,"hello,ActiveMQ");
}
}
```
添加异步保存接口
```java
/**
* 使用Active消息缓存中间价
* @param mood
* @return
*/
String asynSave(Mood mood);
```
实现接口方法调用生产者的sendMessage推送信息
```java
/**
* 队列
*/
private static Destination destination = new ActiveMQQueue("mood.queue.asyn.save");
@Resource
private MoodProducer moodProducer;
@Override
public String asynSave(Mood mood) {
/**
* 调用生产者发送信息
*/
moodProducer.sendMessage(destination,mood);
return "success";
}
```
生产者下添加sendMessage发送Mood对象
```java
public void sendMessage(Destination destination, Mood mood){
jmsMessagingTemplate.convertAndSend(destination,mood);
}
```
消费者队列保存信息
```java
@Resource
private MoodService moodService;
@JmsListener(destination = "mood.queue.asyn.save")
public void receiveQueue(Mood mood){
moodService.save(mood);
}
```
测试
```java
@Test
public void testActiveMQAsyn(){
Mood mood = new Mood();
mood.setId("2");
mood.setUserId("2");
mood.setPublishTime(new Date());
mood.setContent("这是我的第二条说说");
mood.setPraiseNum(0);
String msg = moodService.asynSave(mood);
System.out.println("异步发表说说:"+msg);
}
```
版权声明:本文为qq_42337039原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。