文章目录
依赖
<!--rabbitmq 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1.tmp</version>
</dependency>
<!-- swagger2 依赖 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Swagger第三方ui依赖 -->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.6</version>
</dependency>
<!--mail 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!-- spring data redis 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
sql表
DROP TABLE IF EXISTS `t_mail_log`;
CREATE TABLE `t_mail_log` (
`msgId` VARCHAR(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息id',
`eid` INT(11) DEFAULT NULL COMMENT '接收员工id',
`status` INT(1) DEFAULT NULL COMMENT '状态(0:消息投递中 1:投递成功 2:投递失败)',
`routeKey` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '路由键',
`exchange` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '交换机',
`count` INT(1) DEFAULT NULL COMMENT '重试次数',
`tryTime` DATETIME DEFAULT NULL COMMENT '重试时间',
`createTime` DATETIME DEFAULT NULL COMMENT '创建时间',
`updateTime` DATETIME DEFAULT NULL COMMENT '更新时间',
UNIQUE KEY `msgId` (`msgId`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
/*Data for the table `t_mail_log` */
INSERT INTO `t_mail_log`(`msgId`,`eid`,`status`,`routeKey`,`exchange`,`count`,`tryTime`,`createTime`,`updateTime`) VALUES
('123',538,1,'mail.routing.key','mail.exchange',0,'2021-09-04 21:11:56','2021-09-04 21:10:56','2021-09-04 21:10:56');
application.yml配置文件
spring:
redis:
#超时时间
timeout: 10000ms
#服务器地址
host: 106.14.223.42
#服务器端口
port: 6379
#数据库
database: 0
mail:
host: smtp.163.com
username: zsjia8@163.com #QQ邮箱
password: CVSXFAKKHTSAWCZY #授权令牌
default-encoding: utf-8 #编码格式
protocol: smtp #协议
port: 25
#rabbitmq配置
rabbitmq:
username: admin
password: admin
host: 106.14.223.42
port: 5672
#消息确认回调
publisher-confirm-type: correlated
#消息失败回调
publisher-returns: true
#开启手动确认回调
listener:
simple:
acknowledge-mode: manual
thymeleaf:
cache: false
prefix: classpath:/templates/
suffix: .html
profiles:
active: pro
comment.avatar: /images/avatar.jpg
server:
port: 8099
mybatis-plus:
#配置Mapper映射文件
mapper-locations: classpath*:/mapper/*Mapper.xml
# 配置MyBatis数据返回类型别名(默认别名是类名)
type-aliases-package: com.ljh.po
configuration:
# 自动驼峰命名
map-underscore-to-camel-case: false
实体类MailLog
package com.ljh.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
*
* </p>
*
* @author lijiahao
* @since 2021-09-01
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("t_mail_log")
@ApiModel(value="MailLog对象", description="")
public class MailLog implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "消息id")
private String msgId;
@ApiModelProperty(value = "接收员工id")
private Integer eid;
@ApiModelProperty(value = "状态(0:消息投递中 1:投递成功 2:投递失败)")
private Integer status;
@ApiModelProperty(value = "路由键")
private String routeKey;
@ApiModelProperty(value = "交换机")
private String exchange;
@ApiModelProperty(value = "重试次数")
private Integer count;
@ApiModelProperty(value = "重试时间")
private LocalDateTime tryTime;
@ApiModelProperty(value = "创建时间")
private LocalDateTime createTime;
@ApiModelProperty(value = "更新时间")
private LocalDateTime updateTime;
}
公用类MailConstants
package com.ljh.po;
/**
* rqbbitmq 状态类
*/
public class MailConstants {
//消息处于投递状态
public static final Integer DELIVERING = 0;
//消息投递成功
public static final Integer SUCCESS = 1;
//消息投递失败
public static final Integer FAILURE = 2;
//最大重试次数
public static final Integer MAX_TRY_COUNT = 3;
//消息超时时间
public static final Integer MSG_TIMEOUT = 1;
//队列
public static final String MAIL_QUEUE_NAME = "mail.queue";
//交换机
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
//路由键
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}
RabbitMq配置类RabbitMqConfig
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private IMailLogService mailLogService;
//日志
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConfig.class);
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* 消息确认回调,确认消息是否到达broker
* correlationData 为消息唯一标识(msgId)
* ack 回调结果(成功为true)
* cause 失败原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
String msgId = correlationData.getId();
System.out.println("msgId:"+msgId);
if (ack){
LOGGER.info("消息发送成功========》{}",msgId);
mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId));
}
else {
LOGGER.info("消息发送失败==========>{}",msgId);
}
});
/**
* 消息失败回调,比如router不到queue时回调
* msg 消息主题
* repCode 响应码
* repText 响应描述
* exchange 交换机
* routingkey 路由键
*/
rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
LOGGER.info("{}============>消息发送到queue时失败"+msg.getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue(){
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
Service层、Mapper层代码
public interface IMailLogService extends IService<MailLog> {
}
@Service
public class MailLogServiceImpl extends ServiceImpl<MailLogMapper, MailLog> implements IMailLogService {
}
@Repository
@Mapper
public interface MailLogMapper extends BaseMapper<MailLog> {
}
controller层发送消息到rabbitmq
String msgId = UUID.randomUUID().toString();
MailLog mailLog = new MailLog();
mailLog.setMsgId(msgId);
mailLog.setEid(user1.getId().intValue());
mailLog.setStatus(MailConstants.DELIVERING);
mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
mailLog.setCount(0);
mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
mailLog.setCreateTime(LocalDateTime.now());
mailLog.setUpdateTime(LocalDateTime.now());
mailLogMapper.insert(mailLog);
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME
,user1,new CorrelationData(msgId));
消息接收,发送邮件MailReceiver
@Component
public class MailReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JavaMailSender javaMailSender;
@Autowired
private MailProperties mailProperties;
@Autowired
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel) throws IOException {
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg);
//获取员工实体类
User user = (User) message.getPayload();
MessageHeaders headers = message.getHeaders();
//获取序列号
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//获取消息id(msgId)
String msgId = (String) headers.get("spring_returned_message_correlation");
//声明redis
HashOperations hashOperations = redisTemplate.opsForHash();
try {
//判断redis是否存在msgId,如果有,直接返回
if (hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("消息已经消费过了============》{}",msgId);
/**
* 收到确认
* tag:序列号
* multiple:是否确认多条
*/
channel.basicAck(tag,false);
return;
}
//发件人
helper.setFrom(mailProperties.getUsername());
//邮箱地址
helper.setTo(user.getEmail());
//日期
helper.setSentDate(new Date());
//主题
helper.setSubject("入职欢迎邮件");
//邮件内容
Context context = new Context();
context.setVariable("name",user.getNickname());
String mail = templateEngine.process("mail", context);
helper.setText(mail,true);
//发送邮件
javaMailSender.send(msg);
//加入redis
hashOperations.put("mail_log",msgId,"ok");
//手动回调
channel.basicAck(tag,false);
} catch (Exception e) {
/**
* 手动回调失败
* tag:序号
* false:是否批量处理
* true:是否返回队列
*/
channel.basicNack(tag,false,true);
LOGGER.error("邮件发送失败======={}",e.getMessage());
}
}
}
定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)
@Component
public class MailTask {
@Autowired
private IMailLogService mailLogService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private loginRepository loginRepository;
@Scheduled(cron = "0/10 * * * * ?")//10s执行一次
public void mailtask(){
//首先查询状态为正在发送中的消息而且重试时间小于当前时间
List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>()
.eq("status", 0)
.lt("tryTime", LocalDateTime.now()));
//如果重试次数大于3次不在重试设置为发送失败状态
list.forEach(mailLog -> {
if (mailLog.getCount()>=3){
mailLogService.update(new UpdateWrapper<MailLog>()
.set("status",2)
.eq("msgId",mailLog.getMsgId()));
}
//更新重试次数,更新时间,重试时间
mailLogService.update(new UpdateWrapper<MailLog>().set("count",mailLog.getCount()+1)
.set("updateTime",LocalDateTime.now())
.set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
.eq("msgId",mailLog.getMsgId()));
//获取需要发送的消息信息
User user = loginRepository.getOne(mailLog.getEid().longValue());
//System.out.println("===============>定时task获取到的user"+user);
//发送消息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
user,new CorrelationData(mailLog.getMsgId()));
});
}
}
前端邮件模板mail.html
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.theymeleaf.org">
<head>
<meta charset="UTF-8">
<title>欢迎邮件</title>
</head>
<body>
欢迎 <span th:text="${name}"></span>,成功注册在线笔记系统
</body>
</html>
版权声明:本文为weixin_45386898原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。