canal整合rabbitmq

canal1.1.5好像就开始支持rabbitmq了,然后我下载的是1.1.6,为啥要整合rabbitmq,首先其他mq我也
不会啊,其次各有所需对吧。
在这里插入图片描述

首先要修改canal.properties文件

## tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 修改为rabbitmq 
canal.serverMode = rabbitMQ

## 配置上我们的rabbitmq信息
rabbitmq.host = 8.142.188.187
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =

再修改instance.properties文件

# 链接数据库的信息
canal.instance.master.address=127.0.0.1:3306

# username/password 链接数据库的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root

# mq config  这里是rabbitmq的routerkey
canal.mq.topic=canal_key

修改这几项基本就OK了

rabbitmq配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CanalConfig {

    @Bean
    Queue queue(){
        return  new Queue("canal_queue");
    }

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("canal_exchange");
    }

    @Bean
    Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with("canal_key");
    }
}

这样我每次变动数据库,都会把变动的信息投递给rabbitmq了
在这里插入图片描述
在这里插入图片描述
这里收到的消息都是ASCLL码,所以要转一下

@RabbitListener(queues = "canal_queue")
    public void  getMsg(Message message, Channel channel, String msg){
        String[]chars=msg.split(",");
        StringBuffer stringBuffer = new StringBuffer();
        for(int i=0;i<chars.length;i++){
            stringBuffer.append((char)Integer.parseInt(chars[i]));
        }
        JSONObject jsonObject = JSONObject.parseObject(stringBuffer.toString());
        System.out.println(jsonObject);
    }

在这里插入图片描述
在这里插入图片描述
打完收工


版权声明:本文为m0_67393413原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。