文章目录
缓存同步
缓存数据同步的常见方式有三种:
•设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
•优势:简单、方便
•缺点:时效性差,缓存过期之前可能不一致
•场景:更新频率较低,时效性要求低的业务
•同步双写:在修改数据库的同时,直接修改缓存
•优势:时效性强,缓存与数据库强一致
•缺点:有代码侵入,耦合度高;
•场景:对一致性、时效性要求较高的缓存数据
•**异步通知:**修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
•优势:低耦合,可以同时通知多个缓存服务
•缺点:时效性一般,可能存在中间不一致状态
•场景:时效性要求一般,有多个服务需要同步
而异步实现又可以基于MQ或者Canal来实现:
1)基于MQ的异步通知:
解读:
- 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
- 缓存服务监听MQ消息,然后完成对缓存的更新
依然有少量的代码侵入。
2)基于Canal的通知
解读:
- 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
- Canal监听MySQL变化,当发现变化后,立即通知缓存服务
- 缓存服务接收到canal通知,更新缓存
代码零侵入
Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
基于Canal的异步通知
安装Canal
安装和配置Canal
下面我们就开启mysql的主从同步机制,让Canal来模拟salve
1.开启MySQL主从
Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
这里以之前用Docker运行的mysql为例:
1.1.开启binlog
打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:
修改文件:
vi /tmp/mysql/conf/hmy.cnf
添加内容:
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima #指定监听的数据库
配置解读:
log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库
最终效果:
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
1.2.设置用户权限
接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限。
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

重启mysql容器即可
docker restart mysql
测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:
show master status;

2.安装Canal
2.1.创建网络
我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:
docker network create heima
让mysql加入这个网络:
docker network connect heima mysql
2.3.安装Canal
大家可以上传到虚拟机,然后通过命令导入:
docker load -i canal.tar
canal.tar压缩包链接:https://share.weiyun.com/kHSOJF6Q 密码:7xkkc6
然后运行命令创建Canal容器:
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5
说明:
-p 11111:11111:这是canal的默认监听端口-e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id来查看-e canal.instance.dbUsername=canal:数据库用户名-e canal.instance.dbPassword=canal:数据库密码-e canal.instance.filter.regex=:要监听的表名称
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
查看日志
docker logs -f canal

3.监听Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client
与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
引入依赖:
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
编写配置:
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.205.128:11111 # canal服务地址
编写监听器,监听Canal消息
创建一个类ItemHandler继承EntryHandler并实现其中的三个方法
public class ItemHandler implements EntryHandler<Item> {
@Override
public void insert(Item item) {
//写数据到redis
//写数据到JVM进程缓存
}
@Override
public void update(Item before, Item after) {
//写数据到redis
//写数据到JVM进程缓存
}
@Override
public void delete(Item item) {
//删除数据到redis
//删除数据到JVM进程缓存
}
修改Item实体类
通过@Id(标记表中的id)、@Column(标记表中与属性名不一样的字段)、@Teansient(标记不属于表中的字段)注解完成Item与数据库表字段的映射:
package com.heima.item.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
在RedisHandler封装写数据到redis的方法和删除redis中的数据的方法
//封装添加缓存的方法
public void saveItem(Item item){
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:"+item.getId(),json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
//封装删除缓存的方法
public void deleteItemById(Long id){
redisTemplate.delete("item:id"+id);
}
补全ItemHandler中的方法
@Component
@CanalTable("tb_item")//表名
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long,Item> itemCache;//JVM缓存
@Override
public void insert(Item item) {
//当数据库发送增加操作时,就会执行该方法
//写数据到JVM进程缓存
itemCache.put(item.getId(),item);
//写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
//写数据到JVM进程缓存
itemCache.put(after.getId(),after);
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
redisHandler.deleteItemById(item.getId());
//删除JVM的缓存
itemCache.invalidate(item.getId());
}
}