Canal 详解 Mysql与Redis数据同步 解决方案

要保证数据库和redis强一致性是不可能的,肯定有少许时间的不一致。canal是阿里的一套组件,用来监听mysql master发送的类似binary log的数据,然后让消息费去消费。

Canal 简单原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

Canal环境搭建

查看mysql是否开启了bin log

show variables like 'log_bin'

开启bin log

[root@VM-0-3-centos ~]# vim /etc/my.cnf

# binlog文件名
log-bin=mysql-bin
binlog_format=ROW
#mysql实例id 不能和canal的slaveId重复
server_id=1

##重启mysql
[root@VM-0-3-centos ~]# service mysqld restart

创建canal用户并授权

mysql> CREATE USER canal IDENTIFIED BY 'canal';
ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

如果报错了,说明你的mysql不允许创建这么简单的用户名 canal,需要更改下策略

mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=5;

给canal赋权限

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
mysql> FLUSH PRIVILEGES;

为什么要创建canal用户?

有些用户没有REPLICATION SLAVE, REPLICATION CLIENT的权限,用这些用户连接canal时,无法获取到binlog。这里的canal用户授权了全部权限,所以客户端可以从canal中获取binlog。

安装 canal server

canal指的是canal server,它会读取mysql的binlog,解析后存储起来。

下载地址: https://github.com/alibaba/canal/releases


[root@VM-0-3-centos local]# mkdir canal
[root@VM-0-3-centos local]# cd canal
[root@VM-0-3-centos ~]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
[root@VM-0-3-centos local]# tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz

查看canal占用端口

[root@VM-0-3-centos bin]# jps
7362 Jps
5322 CanalLauncher

[root@VM-0-3-centos bin]# netstat -lnp |grep 5322
tcp        0      0 0.0.0.0:11110           0.0.0.0:*               LISTEN      5322/java           
tcp        0      0 0.0.0.0:11111           0.0.0.0:*               LISTEN      5322/java           
tcp        0      0 0.0.0.0:11112           0.0.0.0:*               LISTEN      5322/java

然后客户端代码

	public static void main(String[] args) {
		// 创建链接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("xxx.xxx.xxx.xxx", 11111),
				"example", "", "");
		int batchSize = 1000;
		int emptyCount = 0;
		try {
			connector.connect();
			connector.subscribe(".*\\..*");
			connector.rollback();
			int totalEmptyCount = 120;
			while (emptyCount < totalEmptyCount) {
				Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
				long batchId = message.getId();
				int size = message.getEntries().size();
				if (batchId == -1 || size == 0) {
					emptyCount++;
					System.out.println("empty count : " + emptyCount);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
					}
				} else {
					emptyCount = 0;
					printEntry(message.getEntries());
				}

				connector.ack(batchId); // 提交确认
			}

			System.out.println("empty too many times, exit");
		} finally {
			connector.disconnect();
		}

	}

	private static void printEntry(List<CanalEntry.Entry> entrys) {
		for (CanalEntry.Entry entry : entrys) {
			if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
					|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
				continue;
			}

			CanalEntry.RowChange rowChage = null;
			try {
				rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
						e);
			}

			CanalEntry.EventType eventType = rowChage.getEventType();

			System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
					entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
					entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

			for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
				if (eventType == CanalEntry.EventType.DELETE) {
					printColumn(rowData.getBeforeColumnsList());
				} else if (eventType == CanalEntry.EventType.INSERT) {
					printColumn(rowData.getAfterColumnsList());
				} else {
					System.out.println("-------&gt; before");
					printColumn(rowData.getBeforeColumnsList());
					System.out.println("-------&gt; after");
					printColumn(rowData.getAfterColumnsList());
				}
			}
		}
	}

	private static void printColumn(List<CanalEntry.Column> columns) {
		for (CanalEntry.Column column : columns) {
			System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
		}
	}

场景验证,修改数据库,客户端得到打印

 

强烈推荐一个 进阶 JAVA架构师 的博客

Java架构师修炼​githubs.xyz


版权声明:本文为LuoZheng4698729原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。