canal部署和配置

参考地址

https://github.com/alibaba/canal/wiki/AdminGuide

概念

canal-admin:控制台,可配置server和instance。

canal-server:代表一个canal运行实例,对应于一个jvm;内部包含多个instance,每个instance都会伪装成一个mysql实例的slave

instance:对应于一个数据队列

架构

img

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

环境准备

数据库环境准备

mysql需要开启binlog,修改my.cnf文件,添加如下配置:

[mysqld]  
log-bin=mysql-bin #添加这一行就ok  
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复  

ps:canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)

创建canal账户

CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
FLUSH PRIVILEGES; 

1.1.4版本安装

安装canal admin

下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压到指定目录下

tar zxvf canal.admin-$version.tar.gz -C /指定目录

修改配置文件

修改文件:application vi conf/application.yml

内容如下:

server:
  port: 8080
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

上述配置中,需要修改的地方有5处:

  1. server.port:修改为应用端口号
  2. spring.datasource.address:canal admin的数据库的地址
  3. spring.datasource.database:数据库名称,默认canal_manager不变
  4. spring.datasource.username:数据库用户名
  5. spring.datasource.password:数据库密码
  6. canal.adminUser:canal admin账号
  7. canal.adminPasswd:canal admin密码

创建上面数据表及表结构

导入初始化sql,路径在项目canal admin下:conf/canal_manager.sql

启动canal-admin

命令:sh bin/startup.sh

默认登入账号密码是:admin/123456

安装canal-server

下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录下

tar -zxvf canal.deployer-1.1.1.tar.gz -C /指定目录

修改配置文件

server先使用local启动,先修改local配置文件,别的配置在控制台修改

修改配置文件/conf/canal_local.properties

需要修改的地方:

  1. canal.admin.manager ,配置canal-admin的地址
  2. canal.admin.user:和admin中的配置统一,对应canal.adminUser
  3. canal.admin.passwd:对应admin中的canal.adminPasswd 的密文(ps:可通过sql命令获取密码:select password(123456); 去掉结果最前面的*就是密码)
  4. canal.admin.register.cluster:集群的名称

启动canal-server

命令:sh bin/startup.sh local

控制台配置

集群管理配置

ps:单机server无需配置集群

新建集群

1.新建集群需要zk地址:

在这里插入图片描述

2.修改集群的主配置

选择载入模板,需要修改模板的配置项如下:

  1. canal.admin.manager :canal-admin的地址

  2. canal.zkServers:zk的地址

  3. 加入配置项:canal.instance.global.spring.xml= classpath:spring/default-instance.xml,代表binlog的读取位置会记录内存和zk

  4. canal.serverMode = RocketMQ:发送rocketmq消息

  5. canal.mq.servers :rocketmq的地址

  6. canal.admin.user:和admin中的配置统一,对应canal.adminUser

  7. canal.admin.passwd:对应admin中的canal.adminPasswd 的密文

    rocketmq集群配置地址时,多个地址之间用;隔开

server管理配置

新建server

在这里插入图片描述

  1. 所属集群可选择集群或者单机
  2. server ip为启动的的canal server的服务ip
  3. admin端口就11110

修改配置

当server是集群模式下时,无法修改配置,只能去对应的集群修改;集群下的server的配置相同
在这里插入图片描述
如果server是单机,修改配置可参考上诉集群主配置

instance管理配置

新建instance

在这里插入图片描述
选择载入模板

修改配置

需要修改配置项如下:

  1. canal.instance.master.address:数据库地址
  2. canal.instance.dbUsername:mysql创建的canal账户
  3. canal.instance.dbPassword:mysql创建的canal账户密码
  4. canal.instance.defaultDatabaseName:数据库名
  5. canal.instance.filter.regex:匹配的数据库表,正则表达式
  6. canal.instance.filter.query.ddl=true:忽略ddl语句(就监控dml语句,insert,update,delete)
  7. canal.mq.topic:mq的topic

1.1.5版本安装

和1.1.4版本的不同点

server配置或者集群下的主配置

  1. canal.serverMode=rocketMQ ,不是RocketMQ
  2. canal.instance.filter.query.ddl 可直接在server统一配置,无需在instance中单独配置
  3. mq配置项名称修改,比如rocketmq.namesrv.addr

rocketmq集群配置地址时,多个地址之间用;隔开

源码解读

deployer模块

该模块主要用于独立部署canal server。通过启动脚本startup.sh中的配置启动入口类(com.alibaba.otter.canal.deployer.CanalLauncher),并且设置系统变量canal.conf为指定的启动配置文件名。

CanalLauncher

启动类的源码如下:

public static void main(String[] args) {
    try {
        .....
        // 1.获取启动配置,并且加载解析配置项
        String conf = System.getProperty("canal.conf", "classpath:canal.properties");
        Properties properties = new Properties();
        if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
            conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
            properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
        } else {
            properties.load(new FileInputStream(conf));
        }
				// 2.创建一个server启动类
        final CanalStarter canalStater = new CanalStarter(properties);
      	// 3.如果配置了canal-admin控制台地址,就调控制台服务中获取server的接口
      	// (/api/{env}/config/server_polling),通过该接口,去admin对应的数据库中查询最新的server的配置
        String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
        if (StringUtils.isNotEmpty(managerAddress)) {
          // 4.user和passwd作为server和admin通信的请求头数据,类似鉴权的作用
            String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
            String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
            String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
            boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
            String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
            String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
            String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
            if (StringUtils.isEmpty(registerIp)) {
                registerIp = AddressUtils.getHostIp();
            }
            final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                user,
                passwd,
                registerIp,
                Integer.parseInt(adminPort),
                autoRegister,
                autoCluster,
                name);
            PlainCanal canalConfig = configClient.findServer(null);
            if (canalConfig == null) {
                throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                   + " can't not found config for [" + registerIp + ":" + adminPort
                                                   + "]");
            }
            Properties managerProperties = canalConfig.getProperties();
            // merge local
            managerProperties.putAll(properties);
            int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                "5"));
          // 5.schedule线程,在admin控制台可以配置server,所以默认定时5秒去admin拉取最新的server配置
            executor.scheduleWithFixedDelay(new Runnable() {

                private PlainCanal lastCanalConfig;

                public void run() {
                    try {
                        if (lastCanalConfig == null) {
                            lastCanalConfig = configClient.findServer(null);
                        } else {
                            PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                            if (newCanalConfig != null) {
                                // 6.远程配置canal.properties修改重新加载整个应用
                                canalStater.stop();
                                Properties managerProperties = newCanalConfig.getProperties();
                                // merge local
                                managerProperties.putAll(properties);
                                canalStater.setProperties(managerProperties);
                              // 7.查看下面CanalServer的源码
                                canalStater.start();

                                lastCanalConfig = newCanalConfig;
                            }
                        }

                    } catch (Throwable e) {
                        logger.error("scan failed", e);
                    }
                }

            }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
            canalStater.setProperties(managerProperties);
        } else {
          // 7.如果没有配置控制台地址,启动参数指定的配置就是server的配置
            canalStater.setProperties(properties);
        }
				// 8.启动server
        canalStater.start();
        runningLatch.await();
        executor.shutdownNow();
    } catch (Throwable e) {
        logger.error("## Something goes wrong when starting up the canal Server:", e);
    }
}

CanalStarter

CanalStarter:主要是server的启动和stop的实现

server启动的方法start()源码如下

public synchronized void start() throws Throwable {
    String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
  // 1.
    if (!"tcp".equalsIgnoreCase(serverMode)) {
      // 1.通过spi的机制来获取
        ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
        canalMQProducer = loader
            .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
        if (canalMQProducer != null) {
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
            canalMQProducer.init(properties);
            Thread.currentThread().setContextClassLoader(cl);
        }
    }

    if (canalMQProducer != null) {
        MQProperties mqProperties = canalMQProducer.getMqProperties();
        // disable netty
        System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
        if (mqProperties.isFlatMessage()) {
            // 设置为raw避免ByteString->Entry的二次解析
            System.setProperty("canal.instance.memory.rawEntry", "false");
        }
    }

    logger.info("## start the canal server.");
    controller = new CanalController(properties);
    controller.start();
    logger.info("## the canal server is running now ......");
    shutdownThread = new Thread(() -> {
        try {
            logger.info("## stop the canal server");
            controller.stop();
            CanalLauncher.runningLatch.countDown();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping canal Server:", e);
        } finally {
            logger.info("## canal server is down.");
        }
    });
    Runtime.getRuntime().addShutdownHook(shutdownThread);

    if (canalMQProducer != null) {
        canalMQStarter = new CanalMQStarter(canalMQProducer);
        String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
        canalMQStarter.start(destinations);
        controller.setCanalMQStarter(canalMQStarter);
    }

    // start canalAdmin
    String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
    if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
        String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
        String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
        CanalAdminController canalAdmin = new CanalAdminController(this);
        canalAdmin.setUser(user);
        canalAdmin.setPasswd(passwd);

        String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

        logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
            port,
            user,
            passwd,
            ip);

        CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
        canalAdminWithNetty.setCanalAdmin(canalAdmin);
        canalAdminWithNetty.setPort(Integer.parseInt(port));
        canalAdminWithNetty.setIp(ip);
        canalAdminWithNetty.start();
        this.canalAdmin = canalAdminWithNetty;
    }

    running = true;
}

版权声明:本文为wantmiss原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。