Canal 源码分析03 -- 解析binLog

负责解析Mysql的二级制文件binlog成为event

核心类:AbstractEventParser

start方法

1. 初始化缓冲队列,然后start()启动

2. 初始化BinLogParser,然后start()启动

3.启动工作线程parseThread

4.开始执行

4.1 先构造Erosa连接

4.2 创造一个心跳线程

   

4.3 preDump 执行dump前的准备工作,具体实现在MysqlEventParser中

4.3.1 校验连接类型必须是MysqlConnection,如果不是,就直接抛出异常

4.3.2 初始化metaConnection,并且连接(connected设置为true,然后打开一个SocketChannel,通过channel 发送对应的headPackage,handshakePacket,认证AuthSwitchRequestPacket,ClientAuthenticationPacket

4.3.3 然后从metaConnection获取二进制文件的格式 BinlogFormat,判断是否在配置的支持的BinLogFormat列表中,不在就抛异常。

重复再执行一遍检查,作为doubleCheck

4.3.4 初始化tableMetaTSDB,然后放入到缓存对象tableMetaCache中

4.4 erosaConnection.connect();// 连接 (同样是开启channel,然后发送对应的package数据包

4.5 从erosaConnection中获取queryServerId

4.6 从erosaConnection中获取startPosition,调用方法findStartPosition

     

4.6.1 GTID模式下,CanalLogPositionManager里取最后的gtid;如果为空,就返回masterPosition

4.6.2 非GTID模式,调用findStartPositionInternal获取,也是先从CanalLogPositionManager获取,找不到就返回主库位点masterPosition,如果主库位点也没了,就获取从库位点standbyPosition;

如果还是找不到位点,就根据给定的时间戳,在指定的binlog中找到最接近于该时间戳(必须小于等于时间戳)的一个事务起始位置

(遍历文件,使用logDecoder解析出event事件对象,然后转化成CanalEntry,LogEventConvert的parse方法;

如果entry类型是事务终止TRANSACTIONEND,初始化entryPosition,然后先设置poisition,再设置Gtid

如果entry类型是事务终止TRANSACTIONBEGIN,初始化entryPosition,然后先设置gtid,再设置position

4.6.3 继续,如果获取到了位点

4.6.3.1 位点的sourceAddress和连接的的address相等;

如果binlog 定位位点失败,1.binlog位点删除,可以直接返回null,走oss binlog处理

2.发生了主备切换,serverId匹配不上,就发起一次基于时间戳查找合适的位点 :

endPosition : ”show master status“ 这个sql语句,对应的位点

startPosition : "show binlog events limit 1" 这个对应的位点

调用findAsPerTimestampInSpecificLogFile,在指定的文件中寻找,最接近于该时间戳(小于)的一个事务起始位置(同上)

最后返回startPosition

4.6.4 如果是事务的位点,就去寻找preTransactionStartPosition:

1.开始扫描,解析出event,然后转化成CanalEntry

2. 如果类型是TRANSACTIONBEGIN,并且事务的位点要小于上面startPosition记录一下transaction begin position

如果事务的位点大于等于startPosition,就直接退出了

3.更新startPosition为事务的位点

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

4.7 校验startPosition和table meta不能为空

4.8 erosaConnection重新连接,调用reconnect()方法,因为在找position过程中可能有状态,需要断开后重建。

4.9 调用parseAndProfilingIfNesscary解析出event,转化成entry

 

    4.9.1 首先校验一下profilingEnabled,如果是true,就获取当前时间为开始startTs

    4.9.2  调用binlogParser.parse解析 ,根据各种不同的LogEvent类型分别解析,如下

   

4.9.3 以UPDATE_ROWS_EVENT 举例,调用parseRowsEvent进行解析

(1) 先判断是否开启了rows过滤,如果开启,直接返回null

(2) 通过event解析出表结构tableMeta:首先从event获取tableMapLogevent,如果为空,直接抛出异常

(3) 接下来,要根据table的dbName和tableName获取 isHeartBeat还是isRDSHeartBeat

(4) 根据dbName和tableName拼接出fullName,校验名称必须符合nameFilter的要求

(5) rds心跳和aliSQL心跳,就构造出对应的tableMeta

(6) 校验tableMeta不能为空

(7) 进一步把eventType分类为EventType.INSERT, EventType.UPDATE 和 EventType.DELETE

其余的类型不支持

(8) 然后初始化rowChangeBuilder

(9) 分INSERT,DELETE和UPDATE类型解析,调用parseOneRow

(10) 开始解析parseOneRow,获取列信息columnInfo和表结构tableMeta

(11) 根据event创建entryPosition;

(12) 如果没有主键,新增一张临时表,先全量导入历史数据,然后老表建立trigger,增量数据也导入新表

(13) 把应用请求锁定,临时表rename为老表的名字,完成增加字段

(14) 从缓存中获取tableMeta

(15) 对于每一行的解析,开始循环每1列,获取对应的ColumnInfo,如果该列没值,就continue下一列

(16) 不解析最后一列,对于循环的当前列,初始化ColumnBuilder,设置name, isKey, mysqlType,index

(17) 获取fieldMeta, 然后获取该列的value,根据value的java类型处理,分成INTEGER,TINYINT,SMALLINT

SHORT等等; 根据value的javaType,设置sqlType,和update/before这种标志位

(18) 构建好rowChangeBuilder后,调用build获取到rowChange

(19) 根据rowChange的值,创建entry对象

4.10 然后记录一下最后的position,和最后一次有数据的时间。如果抛出了异常,需要记录一下出错的位点信息,然后继续向上抛出异常。

5. 开始dump数据

5.1 如果是并行,构建对应的processor

判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据

解析出gtidSet, 然后启动multiStageCoprocessor

调用erosaConnection.dump方法进行dump数据

1.更新配置,调用updateSettings

2.获取主库checkSum信息

3.sendBinlogDumpGTID : 构造binlogDumpCmd,然后把命令转化成对应的字节数组,写入到channel中

4.构造出DirectLogFetcher,并且初始化接收的大小,然后调用start方法启动,receivedBinlogBytes累加

调用fetcher.consume()

5. finally方法 : 关闭连接,

6. 重置缓冲队列,binlogParser重新置位

7.判断如果当前是running状态, 就sleep一段时间以后再重试


版权声明:本文为kuaipao19950507原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。