使用canal进行mysql数据同步到Redis 踩坑记录

使用的数据库 是MySql 5.7
参考:https://blog.csdn.net/tb3039450/article/details/53928351
参考:https://blog.csdn.net/john1337/article/details/85166575
参考:https://www.cnblogs.com/xujishou/p/6306765.html
参考:https://blog.csdn.net/xyw591238/article/details/51965043

1. 修改数据库配置文件

找到数据库的安装目录。 注意一定是 ProgramData 下的my.ini 目录
添加
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
在这里插入图片描述
重启数据库
此电脑右键 --》管理–》服务和应用程序-》服务-》MySQL57 点击停止, 再点击启动
在这里插入图片描述

2.配置mysql数据库

创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容

1.DROP USER ‘canal’@’%’ // 先删除下需要创建的用户, 防止已存在
2.FLUSH PRIVILEGES; // 刷新授权
3.CREATE USER ‘canal’@’%’ IDENTIFIED BY ‘canal’; 创建用户
%的位置 指定该用户在哪个主机上可以登陆,此处如果用的是"localhost",是指该用户只能在本地登录,不能在另外一台机器上远程登录,如果想远程登录的话,将"localhost"改为"%",表示在任何一台电脑上都可以登录;也可以指定某台机器可以远程登录;
IDENTIFIED BY ‘canal’; 该用户的登陆密码,密码可以为空,如果为空则该用户可以不需要密码登陆服务器
4.SHOW GRANTS FOR canal 查看用户是否创建

5.GRANT SELECT, INSERT ON jaliance.* TO ‘canal’@’%’; 授予用户 SELECT 和INSERT 的权限, 在jaliance database 下的所有表
PS: privileges - 用户的操作权限,如SELECT , INSERT , UPDATE 等(详细列表见该文最后面).如果要授予所的权限则使用ALL.;databasename - 数据库名,tablename-表名,如果要授予该用户对所有数据库和表的相应操作权限则可用表示, 如..
grant all privileges on jaliance.
TO ‘canal’@’%’;

6.FLUSH PRIVILEGES;

7.GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’ IDENTIFIED BY ‘canal’; 给用户设置REPLICATION 的权限(一定是*.*, 我换成databasename.table 就不报错 1144 )

没操作完一个指令最好刷新下

3. 下载canal

下载地址:https://github.com/alibaba/canal/releases/
下载部署包
解压到一个文件夹
修改conf 里面的 canal.propertities

添加以下配置

## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info
canal.instance.master.address = 127.0.0.1:3306 #改成自己的数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = canal #改成自己的数据库信息 
canal.instance.dbPassword = canal #改成自己的数据库信息 
canal.instance.defaultDatabaseName = jaliance #改成自己的数据库信息
canal.instance.connectionCharset = UTF-8 #改成自己的数据库信息 

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex = 

启动canal, 双击startup.bat
窗口自运行结果
在这里插入图片描述
canal.log 结果
在这里插入图片描述
example.log 结果

在这里插入图片描述

编写java 客户端

package com.jamapro.springcloud.controller;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;


public class CanalClient {



    public static void main(String args[]) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry( List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn( List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

    private static void redisInsert( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
        }
    }

    private static  void redisUpdate( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
        }
    }

    private static  void redisDelete( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            RedisUtil.delKey("user:"+ columns.get(0).getValue());
        }
    }

}

package com.jamapro.springcloud.controller;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author lgz
 */
public class RedisUtil {

    private static Jedis jedis = null;

    public static synchronized Jedis getJedis() {
        if (jedis == null) {
            jedis = new Jedis("140.143.15.172", 6379);
            jedis.auth("jamapro");
        }
        return jedis;
    }

    public static boolean existKey(String key) {
        return getJedis().exists(key);
    }

    public static void delKey(String key) {
        getJedis().del(key);
    }

    public static String stringGet(String key) {
        return getJedis().get(key);
    }

    public static String stringSet(String key, String value) {
        return getJedis().set(key, value);
    }

    public static void hashSet(String key, String field, String value) {
        getJedis().hset(key, field, value);
    }
}

数据库创建表?
CREATE TABLE user (
id INT(11) NOT NULL,
name VARCHAR(255) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=INNODB DEFAULT CHARSET=latin1;

插入1 条数据
INSERT INTO user (id,NAME) VALUES (1,‘zhangsan’);

控制台打印出log
在这里插入图片描述

查询redis 数据

 public static void main(String args[]) {
        String key = "user:1";
        String value = null;
        if (RedisUtil.existKey(key)) {
            value = RedisUtil.stringGet(key);
        }
        System.out.println(value);

    }

这个是我将zhangsan, 更新成wangwu 之后的结果。
控制台输出
在这里插入图片描述

亲测有效
因为项目已经开始, 数据库里有很多表了, 接下里就是研究下如何将数据库的数据导入到redis 里


版权声明:本文为qq_41572697原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。