Canal + RocketMQ 同步 MySQL 数据到 Redis 实战

原文地址:https://blog.xaoxu.cn/archives/canal-rocketmq-sync-mysql-data-to-redis

前言

缓存和数据库一致性问题

读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新,就容易出现缓存(Redis)和数据库(MySQL)间的数据一致性问题。因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

无论是 先删除Redis缓存,再写MySQL数据库,还是 先写MySQL数据库,再删除Redis缓存,都有可能出现数据不一致的情况:

  • 先删除Redis缓存,再写MySQL数据库:如果删除了Redis缓存,还没有来得及写MySQL数据库,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中的数据就是脏数据。
  • 先写MySQL数据库,再删除Redis缓存:如果先写了MySQL数据库,在删除Redis缓存前,写缓存的线程宕机了,没有删除掉缓存,则也会出现数据不一致情况。

解决方案

方案一:采用延时双删策略

在写库前后都进行 redis.del(key) 操作,并且设定合理的超时时间。具体实现步骤如下:

  1. 先删除缓存
  2. 再写数据库(在1步骤后,2步骤前,可能有 请求因缓存未命中,读到脏数据,再次将脏数据存入缓存)
  3. 休眠500毫秒
  4. 再次删除缓存(删除可能在1-2步之间被存入的脏数据)
public void update(String key, Object data){ 
    redis.delKey(key); 
    db.updateData(data); 
    Thread.sleep(500); 
    redis.delKey(key); 
}
为什么要休眠一段时间呢?休眠多长时间合适呢?

至于休眠多长时间需要视自己的项目情况而定,考虑项目中读取数据的业务逻辑耗时,同时还要考虑Redis和数据库主从同步的耗时来确定自己的休眠时间。
目的:确保请求结束时,写请求可以删除读请求造成的缓存脏数据。

缺点

1、代码耦合性太高
2、白白增加接口请求耗时

方案二:异步更新缓存(基于订阅 MySQL Binlog的同步机制)

MySQL Binlog增量订阅消费+消息队列+增量数据更新到redis

  1. 读Redis:热数据基本都在Redis
  2. 写MySQL:增删改都是操作MySQL
  3. 更新Redis数据:MySQL的数据操作Binlog,然后更新到Redis

那么我们如何订阅Binlog呢?如何将数据发送到消息队列呢?如何消费呢?不要慌,继续往下看,带你进入实战环节!

实战

基于MySQL Binlog同步数据保障一致性的架构图大致如下:

Canal同步数据架构图

Canal是什么?

我这里就不为大家再去啰嗦介绍一遍了,推荐大家阅读下Canal的简介,地址:https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B

Canal工作原理

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

当大家仔细阅读完官方的介绍之后,对Canal也是有了一个初步的了解,接下来我们就进入实战环节。

环境准备

MySQL配置

1、对于自建 MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

2、授权 Canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

SHOW GRANTS FOR 'canal'

Canal的安装与配置

Canal Admin安装和配置

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

1、下载 canal-admin,访问 release 页面,选择需要的包下载,如以 1.1.4 版本为例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

2、解压缩

mkdir /canal/canal-admin
tar zxvf canal.admin-1.1.4.tar.gz  -C /canal/canal-admin

解压完成后,进入 /canal/canal-admin 目录,可以看到如下结构:

drwxr-xr-x   7 xiaoxuxuy  staff   224B Mar 15 16:36 bin
drwxr-xr-x   9 xiaoxuxuy  staff   288B Mar 13 20:27 conf
drwxr-xr-x  90 xiaoxuxuy  staff   2.8K Mar 13 20:24 lib
drwxrwxrwx   5 xiaoxuxuy  staff   160B Mar 17 10:07 logs

3、配置修改

vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

4、初始化元数据库

mysql -h127.1 -uroot -p

# 导入初始化SQL
> source conf/canal_manager.sql
  • 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
  • canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

5、启动

sh bin/startup.sh

启动成功后,可以通过 http://127.0.0.1:8089/ 访问即可看到 Canal 登录页面,默认密码:admin/123456

6、关闭

sh bin/stop.sh
Canal Deployer安装和配置

1、下载 canal-deployer,访问 release 页面,选择需要的包下载,如以 1.1.4 版本为例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2、解压缩

mkdir /canal/canal-deployer
tar zxvf canal.deployer-1.1.4.tar.gz  -C /canal/canal-deployer

解压完成后,进入 /canal/canal-deployer 目录,可以看到如下结构:

drwxr-xr-x   7 xiaoxuxuy  staff   224B Mar 15 18:32 bin
drwxr-xr-x  11 xiaoxuxuy  staff   352B Mar 14 17:09 conf
drwxr-xr-x  83 xiaoxuxuy  staff   2.6K Mar 13 20:27 lib
drwxrwxrwx   5 xiaoxuxuy  staff   160B Mar 14 17:09 logs

3、修改 canal_local.properties 文件配置来覆盖 canal.properties 文件

官方解释:目前conf下会包含canal.properties/canal_local.properties两个文件,考虑历史版本兼容性,默认配置会以canal.properties为主。

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = 

4、启动

sh bin/startup.sh local

启动成功后,我们在 Canal Admin Web UI 中刷新 server 管理,可以看到 canal server 已经启动成功。

Canal Server

5、修改 Canal Server 配置文件,修改消息队列相关配置

Canal Server配置

我这里已 RocketMQ 为示例,RocketMQ 安装参考:RocketMQ QuickStart

启动 RocketMQ UI 管理页面,我这里用 Docker 启动了,根据自己喜欢的方式启动即可:

docker run --name rocketmq-ui -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.7:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -itd styletang/rocketmq-console-ng:1.0.0

然后我们回到 Canal Server 配置文件,我的内容如下:

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
canal.destinations = cms_article,cms_user
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = 192.168.0.7:9876
canal.mq.retries = 3
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = cms
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

主要简单修改了以下几个位置:

# ...
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# ...
canal.destinations = cms_article,cms_user
# ...
canal.mq.servers = 192.168.0.7:9876
# ...
canal.mq.flatMessage = true
# ...
canal.mq.producerGroup = cms
# ...

修改完成后,记得点击保存!保存后会自动重启。

Canal Admin WebUI 中配置 Instance 管理

新建 Instance -> 填写 Instance 名称:cms_article -> 选择所属集群/主机 -> 载入模板 -> 修改配置信息如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=cms-manage.article
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=cms_article
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

配置好之后,点击保存,此时在 Instances 管理中就可以看到新创建实例信息。

Canal Instance配置

这里我对应着 Instance 创建了两张表进行演示。

DROP TABLE IF EXISTS `article`;
CREATE TABLE `article` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4;

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

cms-manage

代码实现

引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>	

添加RocketMQ配置文件,Redis自行配置

# 配置rocketmq
rocketmq:
  name-server: 192.168.0.7:9876
  producer:
    group: cms #生产者
  consumer:
    group: cms #消费者

新增Canal监听SQL类型枚举

/**
 * Canal监听SQL类型
 *
 * @author xiaoxuxuy
 * @date 2022/3/13 11:19 下午
 */
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum SqlType {

    INSERT("INSERT", "插入"),
    UPDATE("UPDATE", "更新"),
    DELETE("DELETE", "删除");

    private final String type;
    private final String name;

    SqlType(String type, String name) {
        this.type = type;
        this.name = name;
    }

    public String getType() {
        return this.type;
    }

    public String getName() {
        return this.name;
    }

}

新增User Model对象、Article Model对象

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
 * @author xiaoxuxuy
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(description = "用户Model", value = "用户Model")
public class User extends Model<User> {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty("主键")
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    @ApiModelProperty("名字")
    private String name;

}
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
 * @author xiaoxuxuy
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(description = "文章Model", value = "文章Model")
public class Article extends Model<Article> {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty("主键")
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    @ApiModelProperty("文章内容")
    private String content;

}

Canal同步服务

import com.alibaba.otter.canal.protocol.FlatMessage;

import java.util.Collection;

/**
 * Canal同步服务
 *
 * @author xiaoxuxuy
 * @date 2022/3/13 11:58 下午
 */
public interface CanalSyncService<T> {

    /**
     * 处理数据
     *
     * @param flatMessage CanalMQ数据
     */
    void process(FlatMessage flatMessage);

    /**
     * DDL语句处理
     *
     * @param flatMessage CanalMQ数据
     */
    void ddl(FlatMessage flatMessage);

    /**
     * 插入
     *
     * @param list 新增数据
     */
    void insert(Collection<T> list);

    /**
     * 更新
     *
     * @param list 更新数据
     */
    void update(Collection<T> list);

    /**
     * 删除
     *
     * @param list 删除数据
     */
    void delete(Collection<T> list);

}

抽象Canal-RocketMQ通用处理服务

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.annotation.TableId;
import com.scaffolding.example.canal.enums.SqlType;
import com.scaffolding.example.canal.service.CanalSyncService;
import com.scaffolding.example.exception.BusinessException;
import com.scaffolding.example.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.google.common.collect.Sets;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.ReflectionUtils;

import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.util.*;

/**
 * 抽象Canal-RocketMQ通用处理服务
 *
 * @author xiaoxuxuy
 * @date 2022/3/13 7:07 下午
 */
@Slf4j
public abstract class AbstractCanalRocketMqRedisService<T> implements CanalSyncService<T> {

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private RedisUtils redisUtils;

    private Class<T> classCache;

    /**
     * 获取Model名称
     *
     * @return Model名称
     */
    protected abstract String getModelName();

    /**
     * 处理数据
     *
     * @param flatMessage CanalMQ数据
     */
    @Override
    public void process(FlatMessage flatMessage) {

        if (flatMessage.getIsDdl()) {
            ddl(flatMessage);
            return;
        }

        Set<T> data = getData(flatMessage);

        if (SqlType.INSERT.getType().equals(flatMessage.getType())) {
            insert(data);
        }

        if (SqlType.UPDATE.getType().equals(flatMessage.getType())) {
            update(data);
        }

        if (SqlType.DELETE.getType().equals(flatMessage.getType())) {
            delete(data);
        }

    }

    /**
     * DDL语句处理
     *
     * @param flatMessage CanalMQ数据
     */
    @Override
    public void ddl(FlatMessage flatMessage) {
        //TODO : DDL需要同步,删库清空,更新字段处理
    }

    /**
     * 插入
     *
     * @param list 新增数据
     */
    @Override
    public void insert(Collection<T> list) {
        insertOrUpdate(list);
    }

    /**
     * 更新
     *
     * @param list 更新数据
     */
    @Override
    public void update(Collection<T> list) {
        insertOrUpdate(list);
    }

    /**
     * 删除
     *
     * @param list 删除数据
     */
    @Override
    public void delete(Collection<T> list) {
        Set<String> keys = Sets.newHashSetWithExpectedSize(list.size());

        for (T data : list) {
            keys.add(getWrapRedisKey(data));
        }

        redisUtils.delAll(keys);
    }

    /**
     * 插入或者更新redis
     *
     * @param list 数据
     */
    @SuppressWarnings("unchecked")
    private void insertOrUpdate(Collection<T> list) {
        redisTemplate.executePipelined((RedisConnection redisConnection) -> {
            for (T data : list) {
                String key = getWrapRedisKey(data);
                // 序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                // 序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(data);
                redisConnection.set(Objects.requireNonNull(redisKey), Objects.requireNonNull(redisValue));
            }
            return null;
        });
    }

    /**
     * 封装redis的key
     *
     * @param t 原对象
     * @return key
     */
    protected String getWrapRedisKey(T t) {
        return getModelName() + ":" + getIdValue(t);
    }

    /**
     * 获取类泛型
     *
     * @return 泛型Class
     */
    @SuppressWarnings("unchecked")
    protected Class<T> getTypeArgument() {
        if (classCache == null) {
            classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        }
        return classCache;
    }

    /**
     * 获取Object标有@TableId注解的字段值
     *
     * @param t 对象
     * @return id值
     */
    protected Object getIdValue(T t) {
        Field fieldOfId = getIdField();
        ReflectionUtils.makeAccessible(fieldOfId);
        return ReflectionUtils.getField(fieldOfId, t);
    }

    /**
     * 获取Class标有@TableId注解的字段名称
     *
     * @return id字段名称
     */
    protected Field getIdField() {
        Class<T> clz = getTypeArgument();
        Field[] fields = clz.getDeclaredFields();
        for (Field field : fields) {
            TableId annotation = field.getAnnotation(TableId.class);

            if (annotation != null) {
                return field;
            }
        }
        log.error("PO类未设置@TableId注解");
        throw new BusinessException("PO类未设置@TableId注解");
    }

    /**
     * 转换Canal的FlatMessage中data成泛型对象
     *
     * @param flatMessage Canal发送MQ信息
     * @return 泛型对象集合
     */
    protected Set<T> getData(FlatMessage flatMessage) {
        List<Map<String, String>> sourceData = flatMessage.getData();
        Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size());
        for (Map<String, String> map : sourceData) {
            T t = JSON.parseObject(JSON.toJSONString(map), getTypeArgument());
            targetData.add(t);
        }
        return targetData;
    }
}

Canal消息的订阅代码

RocketMQ 是支持广播消费的,只需要在消费端进行配置即可,默认情况下使用的是集群消费,这就意味着如果我们配置了多个消费者实例,只会有一个实例消费消息。

对于更新 Redis 来说,一个实例消费消息,完成 Redis 的更新,这就够了。

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.scaffolding.example.canal.Model.Article;
import com.scaffolding.example.canal.service.impl.AbstractCanalRocketMqRedisService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Service;

/**
 * @author xiaoxuxuy
 * @date 2022/3/14 5:37 下午
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "cms_article", consumerGroup = "article")
public class TestArticleConsumer extends AbstractCanalRocketMqRedisService<Article>
        implements RocketMQListener<FlatMessage>, RocketMQPushConsumerLifecycleListener {

    @Override
    public void onMessage(FlatMessage flatMessage) {
        log.info("consumer message {}", flatMessage);
        try {
            process(flatMessage);
        } catch (Exception e) {
            log.warn(String.format("message [%s] 消费失败", flatMessage), e);
            throw new RuntimeException(e);
        }
    }

    @Getter
    private final String modelName = Article.class.getSimpleName();

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // set consumer consume message from now
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }

}
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.scaffolding.example.canal.Model.User;
import com.scaffolding.example.canal.service.impl.AbstractCanalRocketMqRedisService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Service;

/**
 * @author xiaoxuxuy
 * @date 2022/3/14 5:37 下午
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "cms_user", consumerGroup = "user")
public class TestUserConsumer extends AbstractCanalRocketMqRedisService<User>
        implements RocketMQListener<FlatMessage>, RocketMQPushConsumerLifecycleListener {

    @Override
    public void onMessage(FlatMessage flatMessage) {
        log.info("consumer message {}", flatMessage);
        try {
            process(flatMessage);
        } catch (Exception e) {
            log.warn(String.format("message [%s] 消费失败", flatMessage), e);
            throw new RuntimeException(e);
        }
    }

    @Getter
    private final String modelName = User.class.getSimpleName();

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // set consumer consume message from now
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }

}

测试

插入article表一条数据:

article

观察 Redis 存储内容如下:

Redis

看到 RocketMQ 数据成功消费存储到 Redis 中。

Enjoy life !!!🤩🤩🤩


版权声明:本文为qq_42889280原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。