rocketmq的HA主从同步源码解读一(HAService#start)

HA主从同步总览

master和slave之间主要同步两部分信息,元数据信息复制采用基于Netty定时同步。commitLog信息复制基于jdk nio实现;HA主从同步也即rocketmq的master将数据复制给slave,分为同步复制和异步复制;同步复制比异步复制多一个写线程阻塞等待;
HA主从同步的程序入口在CommitLog#handleHA方法中,该方法中会判断只有当前broker机器处于主从同步复制模式下时,才执行该方法,写线程阻塞在该方法中会超时阻塞,时间为5秒,时间到了,则报FLUSH_SLAVE_TIMEOUT,若是异步复制模式,则该方法啥也不做。这是同步与异步的唯一区别;
主从复制之同步复制模式逻辑和主从刷盘之同步刷盘非常像,分为三部分:
第一部分是写线程,即存储消息的线程,将消息存储到映射文件后,开始提交复制请求groupTransferService.putRequest(request);其中会将GroupCommitRequest实例提交至requestsWrite集合中,并且执行waitPoint.countDown()方法,唤醒阻塞在waitPoint上线程;接着写线程会执行service.getWaitNotifyObject().wakeupAll()和request.future().get,写线程会阻塞等待;
第二部分在messageStore.start方法中,会运行haService.start方法,其中又会运行groupTransferService.start方法,groupTransferService线程的run方法中调用了waitPoint.await(xxx)方法,带超时时间的阻塞,时间为10毫秒,时间到或被其他线程采用waitPoint.countDown方法时,才会被唤醒,唤醒后首先交换requestsRead和requestsWrite集合,接着执行this.doWaitTransfer()方法,该方法中先判断requestsRead集合,若为空,则啥也不做,否不为空,则会拿到其中每个GroupCommitRequest实例,取出其中的请求复制的偏移量和从机回传给主机的最大复制偏移量做比较,若前者大于后者,则表示,主机还有剩余数据没有复制给从机,若此时写线程未超时(没超过5秒),则groupTransferService线程会超时阻塞,时间为1秒,调用的是WaitNotifyObject实例的wait方法;当超时时间到了会自动醒来或被其他线程唤醒时,会醒来;
第三部分是在线程ReadSocketService的run方法中会调用processReadEvent#notifyTransferSome,其中又会调用notifyTransferObject.wakeup()方法将GroupTransferService线程唤醒,这个唤醒意思是主机读取了从机上报的复制偏移量,那么写线程提交的主从同步数据的请求已经完成了,现在赶紧回头告诉写线程,不要让它阻塞在那儿;所以为了唤醒写线程,则先要唤醒GroupTransferService线程,再在GroupTransferService线程中执行唤醒写线程的逻辑,GroupTransferService线程醒来后,首先比较请求复制的偏移量和从机回传给主机的最大复制偏移量(比如说写线程请求复制的偏移量为10,主机复制给从机的最大偏移量为6,此时主机需要将差值4复制给从机,最终请求的偏移量等于从机上传给主机的偏移量,即表示本次主从复制完成),然后存储结果至future中,写线程此时就会get到数据,会醒来;
其实这块和同步刷盘中的groupCommitService一模一样,也是构建GroupCommitRequest实例加入到reqeustsWrite集合中;不同的是同步刷盘中没有第三部分;
异步复制就不会走上边三部分逻辑;

1)HAService#start

HAService代表broker主机,HAClient代表broker从机;HAClient是HAService的内部类,是一个Runnalble;只有在从机上HAClient实例才可以正常运行;
在BrokerController#start中,会执行messageStore.start方法,其中会判断若没有开启DLedger,则执行haService.start方法,其中会执行四个方法,依次是this.acceptSocketService.beginAccept(),this.acceptSocketService.start(),this.groupTransferService.start(),this.haClient.start();其中前三个方法属于主机中运行的方法,最后一个属于从机中运行的方法,先研究this.haClient.start方法;找到run方法;run方法中分为以下几步:

1.1)AcceptSocketService#beginAccept

设置主机serverSocketChannel相关包括:
a)ServerSocketChannel.open方法获取一个jdk中nio的ServerSocketChannel实例;
b)获取一个jdk中nio的Selector实例,这里并没有设置serverSocketChannel接收缓冲区和发送缓冲区的大小;
c)绑定ssc的端口为10912;
d)设置ssc非阻塞;
e)最后将serverSocketChannel注册进AcceptSocketService所拥有的selector中,并且关注的accept事件;

1.2)AcceptSocketService#run

acceptSocketService线程会一直while循环,退出循环的条件是线程状态变为了stopped,所以即使readSocketService线程发现已经没有数据可读了而停止循环,但只要acceptSocketService线程不停下来,当再次有数据可读时,会触发读事件,readSocketService线程会再次启动;

1.2.1)selector.select(1000)

调用的是超时阻塞,时间为1秒,当serverSocketChannel上事件为accept时,或者超时了,则自动唤醒,继续下边代码执行;由于acceptSocketService的run方法是循环方法,所以下次又会进入到该阻塞处;

1.2.2)处理accept事件

1.2.2.1)获取socketChannel

当判断为accept事件后,获取服务端serverSocketChannel对应的客户端channel即socketChannel;

1.2.2.2)实例化HAConnection

以获取到的客户端socketChannel为入参,实例化HAConnection;

1.2.2.2.1)设置socketChannel相关

设置了socketChannel接收缓冲区和发送缓冲区的大小为64kb,设置非阻塞;

1.2.2.2.2)实例化WriteSocketService和ReadSocketService

以获取到的客户端socketChannel为入参实例化写线程和读线程,读线程用于读取从机上传的数据,写线程用于主机向从机发送数据;
writeSocketService的构造方法中创建了Selector实例,保存了客户端socketChannel,将socketChannel注册进了selector中并对写事件感兴趣,最后设置写线程为后台守护线程;
readSocketService的构造方法中创建了Selector实例,保存了客户端socketChannel,将socketChannel注册进了selector中并对读事件感兴趣,最后设置读线程为后台守护线程;
通过以上设置可以看出,两个线程各自都拥有独立的selector,但是共享一个socketChannel;

1.2.2.3)HAConnection#start方法

执行conn.start方法,最后调用HAConnection的start方法;其中会执行this.readSocketService.start()和this.writeSocketService.start()方法,在主机中可以正常运行;

1.2.2.3.1)ReadSocketService#run

由于ReadSocketService类继承了Runnable接口且具有selector和socketChannel属性,所以readSocketChannel可以看做是一个线程,且其run方法一定是不断轮询,检查是否有读事件发生,即从机是否给当前主机上传了消息存储进度;退出轮询的条件是线程状态不对,或者长时间没有数据可读了,这个时间由属性haHousekeepingInterval控制,默认20s;

1.2.2.3.1.1)selector.select(1000)

超时等待,时间为1s,或者有数据可读则被唤醒;

1.2.2.3.1.2)ReadSocketService#processReadEvent

主机接收从机的数据,可能是多条,每条数据帧固定8字节,保存着从机的commitLog的进度值,主机只需要知道从机最大的commitLog进度值即可,即取出来最后一条数据中的进度值即可;该方法中首先判断byteBufferRead是否被填满了,若是的,则将byteBufferRead模式设置为读模式,复位处理进度
a)socketChannel.read(byteBufferRead)
接着进入while循环,首先从socketChannel中读取数据至byteBufferRead,和HAClient中读取数据一样,也分为三种结果;ReadSocketService类中封装了1mb大小的ByteBuffer类型的byteBufferRead用于接收数据,从socketChannel中读取数据至byteBufferRead中,返回readSize;
b)获取当前byteBufferRead中最后一个完整数据帧
readSize大于0说明成功读取到数据至buffer中,先判断是否有至少有一条数据帧可读,若有,则获取当前byteBufferRead中最后一个完整数据帧,因为最后一条数据可能存在半包,比如position为20,则表示将数据从socketChannel中拿出来填充到byteBufferRead中,position移动到了第20字节处,前边已经填充了8+8+4,所以position为8是最后一个完整数据帧的起点,因为最后一个完整数据帧中存放的从机进度是最新值,也是最大值,前边的都可以不管;
c)HAService#notifyTransferSome方法
一个broker主机可能对应多个从机,push2SlaveMaxOffset存储的是所有从机中的最大commitLog偏移量,主机本次获取到对应的从机中commitLog的最大进度值后,会比较是否大于push2SlaveMaxOffset值,若大于则替换,否则notifyTransferSome方法不做任何操作,若大于,则接着执行 GroupTransferService#notifyTransferSome方法,其中又会执行WaitNotifyObject#wakeup方法,唤醒在同一个WaitNotifyObject实例即notifyTransferObject上等待的线程即groupTransferService线程;

1.2.2.3.1.3)结束监听

最后若超过20秒没有发生通信,则跳出while循环,结束HAConnection连接;结束连接包括:设置读写线程状态分别为stopped,从list中移除当前HAConnection实例,取消selector中有值的SelectionKey实例,关闭selector,关闭socketChannel;

1.2.2.3.2)WriteSocketService#run

writeSocketService和readSocketService一样,可以看成是一个线程,在主机中可以正常运行,但前者是负责由broker主机向从机发送数据,数据帧格式是8+4+x,前12个字节固定,存储数据在commitLog中的物理偏移量,数据x的大小,x是实际数据;writeSocketService线程有12字节的ByteBuffer类型的byteBufferHeader用于存放帧头, run方法是一个while方法,开始会阻塞在selector上,当超时时间到了或socketChannel写缓冲区有空间可写时,程序会结束阻塞,继续运行;

1.2.2.3.2.1)selector.select(1000)

超时等待,时间为1s,或者socketChannel有空间可写则被唤醒;

1.2.2.3.2.2)根据从机同步进度确定主机接下来复制数据的起点

slaveRequestOffset表示从机上传到主机的同步commitLog的进度值,有三种情况,-1,0,大于0;若nextTransferFromWhere为-1,则表示还未确定主从复制起点,接着执行以下判断确定当前broker主机发送给从机的数据起点:
a)slaveRequestOffset为-1时表示从机还未上传数据到主机,则writeSocketService线程休眠10ms,再次循环阻塞在selector上;
b)slaveRequestOffset为0时表示是刚新建的一个从机,则获取当前broker主机commitLog最大的maxOffset,获取maxOffset所在的mappedFile,该文件的名字即是复制的起点,此时将其赋给nextTransferFromWhere,即从最后一个顺序写的commitLog文件开始同步数据;
c)slaveRequestOffset大于0时表示从机已经同步的commitLog偏移量,将其赋给nextTransferFromWhere;

1.2.2.3.2.3)距离上一次发送已过去5s

若已经发送完成,则接着判断通信间隔时间若大于5秒,则调用WriteSocketService#transferData方法发送心跳包以维持主从长连接状态,返回lastWriteOver,若返回值为false,则再次循环,发送8+4+x数据帧作为心跳,前8个字节存储的是nextTransferFromWhere,接着4个字节存储的是0,表示消息长度为0,x为null;
若上一轮数据未发送完,则直接调用transferData方法发送;

1.2.2.3.2.4)DefaultMessageStore#getCommitLogData

若上一轮数据已发送完成,则调用此方法从commitLog中拿到需要同步给从机的具体数据,入参为nextTransferFromWhere,返回值为SelectMappedBufferResult实例selectResult,即获取nextTransferFromWhere所在的mappedFile,并且以nextTransferFromWhere为起点直到当前mappedFile的数据有效位置建立一个buffer切片;所以这个有效数据可能远远超过数据格式最大32kb的限制;

1.2.2.3.2.5)判断selectResult是否大于32kb

一次最多只能复制32kb,所以先判断有效数据是否大于32kb,大于则size设置为32kb,更新nextTransferFromWhere+=size,再单独构建帧头;

1.2.2.3.2.6)填充byteBufferHeader

设置position为0,limit为12,先将更新前的nextTransferFromWhere放进去表示数据起点,再将size放进去表示数据大小,这样就组成了数据帧头,而具体数据就是selectResult;

1.2.2.3.2.7)WriteSocketService#transferData

a)调用socketChannel.write(byteBufferHeader)方法先发送帧头,往socketChannel写缓冲区中写数据,返回值是写成功的字节数,也分为三种情况,一是返回值大于0则表示写成功了,则更新lastWriteTimestamp时间戳;二是返回值为0,表示写缓冲器满了;三是返回值小于0表示socketChannel处于半关闭状态;当连发三次都提示socketChannel写缓冲区满了,或者byteBufferHeader中无数据了,则跳出发送帧头的循环;
b)判断是否为心跳包
当selectMappedBufferResult为null,即没有具体数据可发送时,表明此时是一个心跳包,当帧头发送完毕时返回true,否则返回false;
c)发送具体消息
只有帧头全部发送完成了,才可以发送保存在selectMappedBufferResult中的具体消息;发送消息体逻辑和帧头的逻辑一样,这是一个while循环,退出循环的条件是smbr中没有剩余数据了,或者连续尝试写三次socketChannel的写缓冲区都满了;
退出循环后判断,最后只有byteBufferHeader和selectMappedBufferResult中都没有待处理数据,则返回true,否则返回false;
当smbr中无剩余数据,即本轮消息都发送完成了,则执行selectMappedBufferResult.release方法,释放mappedByteBuffer,并且设置selectMappedBufferResult为null,释放资源,结束transferData方法,返回lastWriteOver表示是否全部发送完成;

1.2.2.3.2.8)WaitNotifyObject#allWaitForRunning方法

回到run方法中,再次新一轮循环,重复上述步骤;在发送帧头前会先判读上一轮是否发送完成即lastWriteOver是否为true,只要为false,则一直调用transferData方法,当lastWriteOver为true时,才发送新的帧头,由于nextTransferFromWhere已经更新了,所以会以该值为入参,重新在commitLog中找到buffer切片selectMappedBufferResult,若获取到的切片selectResult为null,则表示此次数据已全部同步完成,则执行WaitNotifyObject#allWaitForRunning方法,当前writeSocketService线程等待100ms;

1.2.2.3.2.9)释放资源

最后所有数据发送完成后,将selectMappedBufferResult调用release释放掉其中的mappedByteBuffer,关掉writeSocketService线程,关掉readSocketService线程,将HAConnection从list中移除,拿到selector中的不为null的SelectionKey实例,执行cancel方法,关闭selector,关闭socketChannel;
此时WriteSocketService#run方法执行完毕,回到AcceptSocketService#run方法,继续轮询,当有写事件时会再次新建HAConnection实例,启动该读和写线程;

1.2.2.4)HAService#addConnection方法

最后将HAConnection实例加入到List集合中;

1.3)GroupTransferService#run

1.3.1)waitPoint.await(10)

groupTransferService线程会10ms的超时阻塞,或者被其他线程唤醒,当执行HAService#putRequest方法时,会唤醒groupTransferService线程;醒来后,会执行swapRequests方法,交换requestsWrite和requestsRead集合;

1.3.2)GroupTransferService#doWaitTransfer方法

遍历requestsRead集合,判断当前push2SlaveMaxOffset值是否不小于groupCommitRequest实例中的nextOffset;
1.3.2.1)当不小于时表示该条消息已经主从已经同步过了,此时直接调用GroupCommitRequest#wakeupCustomer方法,会将putMessageStatus状态放进completableFuture中,由于存储消息的线程会一直阻塞在request.future().get处,此时存储消息的线程会get到结果,会醒来,将putMessageStatus结果返回;
1.3.2.2)当小于时表明该条消息并没有同步到从机,此时会调用notifyTransferObject.waitForRunning方法,groupTransferService线程会进入等待状态;在readSocketService线程的run方法中,当有读事件发生时,会执行processReadEvent方法,其中又会执行HAService#notifyTransferSome方法,判断当broker从机上传的最大偏移量大于原来的旧值push2SlaveMaxOffset,则执行groupTransferService.notifyTransferSome方法,其中会执行notifyTransferObject.wakeup方法,唤醒等待的groupTransferService线程,醒来后会再次调用GroupCommitRequest#wakeupCustomer方法;其实这个线程就是判断同步是否完成,完成了则返回结果给存储线程,否则继续等待;

1.4)HAClient#run

HAClient运行在从机上,每5s上报commitLog进度至主机,以jdk的nio轮询方式接受主机传来的commitLog数据,重难点在于半包的处理;

1.4.1)设置从机socketChannel相关

connectMaster方法中,masterAddress.get方法只有在从机中运行才会返回主机地址,主机中运行该方法返回null,从机连接主机失败时也返回null;接着会设置socketChannel相关,SocketChannel.open方法拿到一个socketChannel,设置socketChannel接收缓冲区和发送缓冲区的大小均为64kb,设置非阻塞,发起远程连接,最后将socketChannel注册进HAClient所拥有的selector中,并且关注read事件;保存commitLog最大offset;记录此时设置socketChannel时间;

1.4.2)从机每5秒上报同步进度

当前时间与设置socketChannel时间之差若大于5秒,则上报同步进度至主机;调用的系统方法socketChannel.write(xxx),每次写8个字节,表示当前从机的commitLog中最大偏移量;最后更新上报时间;

1.4.3)阻塞在selector上

调用的是超时阻塞,时间为1秒,当socketChannel上事件为read时,或者超时了,则自动唤醒,继续下边代码执行;由于HAClient的run方法是循环方法,所以下次又会进入到该阻塞处;

1.4.4)处理主机发送到从机的数据

阻塞被唤醒后,会执行processReadEvent方法,处理主机发送到从机的数据;HAClient中有两个ByteBuffer类型的实例变量,byteBufferRead和byteBufferBackup,假设分别指向堆中的区域a和b,socketChannel.read(this.byteBufferRead)将数据从socketChannel的读缓冲区读取到byteBufferRead中,byteBufferRead大小设为了4M,默认是写模式,此时limit等于容量大小,即4M,每写一条数据到byteBufferRead中,position就往后移动,直到等于limit,此时存在一个问题即最后一条数据很可能为半包,所以先判断是否是半包,若是半包则要将半包数据转移到一块备份区域byteBufferBackup所指向的堆b中,但不管是不是半包,此时都要交换byteBufferRead和byteBufferBackup的指向,即byteBufferRead指向堆b,byteBufferBackup指向堆a,并且接下来数据会往byteBufferRead指向的b中写,直到写满,而byteBufferBackup所指向的堆a则变为备份区域,若不是半包,则无需数据转移,交换指向之后,数据直接写往byteBufferRead指向的堆b中,而此时byteBufferBackup所指向的堆a变为备份;先判断byteBufferRead是否有足够的空间,若有则将数据从socketChannel读缓冲区读取到byteByteBuffer,即根据socketChannel.read(byteBufferRead)的返回值一共分为三种情况;

1.4.4.1)返回值大于0表示从channel中加载到buffer中成功

若socketChannel的读缓冲区中有数据,且写到byteBufferRead中成功了,则接着执行dispatchReadRequest方法,该方法负责将byteBufferRead中的数据存储进commitLog中;
首先要搞清楚,主机同步给从机的数据格式是怎么样的,每条数据由三部分组成,8+4+x1,其中8+4合称为帧头,8字节中存储的是本条数据在commitLog中的物理偏移量,4字节中存储的是传输的数据x1大小(不固定),x1是实际数据,最大32kb;所以当数据从主机传输到从机的socketChannel读缓冲区,接着写入byteBufferRead后,第一个if需要判断,byteBufferRead中的数据是否大于12,若大于,则第二个if中再比较是否大于12+x1,若大于,则表示至少有一帧完整数据可从byteBufferRead中提取,则首次的时候修改byteBufferRead的position为12,从12开始提取x1大小数据至数组中,再以本条数据的物理偏移量和传输的数据为入参,调用commitLog.appendData(startOffset, data)方法,即找到startOffset所在的mappedFile,将数据data追加进mappedFile中;随后找到第二条数据的起点为12+x1,假设第二条数据的格式是8+4+x2,再次重复以上判断,若两个if都成立,则表示又至少有一帧完整数据可从byeBufferRead中提取,则再次修改position为12+x1+12,从该位置开始提取x2大小的数据至数组中,再追加进mappedFile中;
当byteBufferRead中数据量小于12或者介于[12,12+x ),即小于帧头或小于完整一帧,则判断byteBufferRead中position是否小于limit,小于意味着byteBufferRead还未被填满,等于意味着填满了,没填满则继续从socketChannel读取数据到byteBufferRead中,只有填满了才会执行this.reallocateByteBuffer()方法,该方法中首先判断是否到了(4mb - dispatchPosition)的差值,差值即表示半包的大小,若半包大小大于0,则先清空备份区域byteBufferBackup,接着将半包放入备份区域byteBufferBackup中,接着交换byteBufferReader与byteBufferBackup的指向,设置byteBufferReader的position和limit,归零dispatchPosition,如果没有半包则直接交换byteBufferReader与byteBufferBackup的指向,设置byteBufferReader的position和limit,归零dispatchPosition;最后退出reallocateByteBuffer方法;

1.4.4.2)返回值为0表示读缓冲区channel中已无数据

此时执行readSizeZeroTimes++,当次数不小于3时,执行break退出循环;

1.4.4.3)返回-1表示主机socket关闭了

即socket处于半关闭状态,此时退出循环后,会立即关闭当前从机socket;


版权声明:本文为u014570939原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。