maxwell mysql_Maxwell的安装和使用

安装准备工作

Maxwell=MySQL+Kafka.

1.安装MySQL

请参照之前的博客。

在安装完MySQL之后,需要修改my.cnf文件

vi /etc/my.cnf

[mysqld]

server-id = 1

binlog_format = ROW

重启MySQL,然后登陆到MySQL之后,查看是否已经修改过来:

mysql> show variables like 'binlog_format';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| binlog_format | ROW |

+---------------+-------+

在MySQL中添加Maxwell用户,以及分配权限:

mysql> create database maxwell;

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

mysql> flush privileges;

2.安装Kafka

2.1下载安装包

https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

2.2解压Kafaka安装包

[hadoop@hadoop001 app]$ tar -zxvf kafka_2.11-0.10.2.1.tgz

2.3启动Zookeeper

[hadoop@hadoop001 app]$ cd zookeeper-3.4.6/bin

[hadoop@hadoop001 bin]$ ./zkServer.sh status

JMX enabled by default

Using config: /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: standalone

2.4启动Kafka

[hadoop@hadoop001 app]$ cd kafka_2.11-0.10.2.1/

[hadoop@hadoop001 kafka_2.11-0.10.2.1]$ bin/kafka-server-start.sh config/server.properties

3.安装Maxwell

3.1下载安装包

3.2解压安装

[hadoop@hadoop001 app]$ tar -zxvf maxwell-1.20.0.tar.gz

4.使用

4.1 STDOUT配置

4.1.1在MySQL中创建数据表:

create table xiaoyao(id int not null primary key,name varchar(20),age int,address varchar(20));

4.1.2开启Maxwell:

[hadoop@hadoop001 maxwell-1.20.0]$ bin/maxwell --user=maxwell --password=123456 --host='127.0.0.1' --producer=stdout

4.1.3对数据操作:

mysql> insert into xiaoyao values(1,'xiaoyao',0,'beijing');

mysql> insert into xiaoyao values(2,'xiaoyao1',20,'beijing');

mysql> update xiaoyao set age=15 where id=1;

4.1.4可以在Maxwell中可以看到控制台输出:

插入操作:

{"database":"test","table":"xiaoyao","type":"insert","ts":1553397965,"xid":494,"commit":true,"data":{"id":2,"name":"xiaoyao1","age":20,"address":"beijing"}}

更新操作(binlog会记录所有的字段,以及原先的值):

{"database":"test","table":"xiaoyao","type":"update","ts":1553398124,"xid":550,"commit":true,"data":{"id":1,"name":"xiaoyao","age":15,"address":"beijing"},"old":{"age":0}}

4.2Maxwell与Kafka结合

开启Maxwell:

bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \

--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.1

开启Kakfa:

bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \

--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.1

进入MySQL,修改数据:

mysql> use test;

mysql> update xiaoyao set age=30 where id=2;

开启Kafka消费:

[hadoop@hadoop001 kafka_2.11-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic maxwell --from-beginning

{"database":"test","table":"xiaoyao","type":"update","ts":1553401735,"xid":1675,"commit":true,"data":{"id":2,"name":"xiaoyao1","age":30,"address":"beijing"},"old":{"age":20}}

5.Filters

对产生的数据进行过滤:

bin/maxwell --user='maxwell' --password='mysqlmaxwellpwd' --host='localhost' \

--producer=kafka --kafka.bootstrap.servers=localhost:9092 \

--kafka_topic=maxwells --filter 'exclude: ambari.*, include: test_binlog.*'

也可以自定义过滤规则,请参考:

6.关于如何查看binlog

Maxwell vs Canal

Canal(服务端)

Maxwell(客户端+服务端)

语言

Java

Java

活跃度

活跃

活跃

HA

支持

定制 但是支持断点还原功能

数据落地

定制

落地到Kafka

分区

支持

支持

bootstrap

不支持

支持

数据格式

格式自由

json(格式固定)

文档

较详细

较详细

随机读

支持

支持


版权声明:本文为weixin_29022299原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。