Redis主从同步源码浅析-Slave端

前一篇文章写了下redis主从同步的server端代码,这里补一下slave端的。

简单来讲,看了master端就知道slave端的代码大概流程了:

  1. 中断跟本slave的下一级slave的连接,强迫其重连SYNC;
  2. 给master发送PING确认其状态是否OK;
  3. 发送SYNC要求master做RDB快照(2.8版本以上会有PSYNC的指令,也就是部分同步,下回介绍。);
  4. 接收RDB文件大小;
  5. 接收RDB文件;
  6. emptyDb()清空当前数据库,rdbLoad()重新加载新的RDB文件;
  7. 按需startAppendOnly,然后接收master过来的累积和实时更新数据;

下面分别介绍这些步骤。

零、slave初始化-启动同步流程

redis搭建slave比较简单,有2种方式,第一种是在配置文件中指定:

1slaveof 127.0.0.1 6379

这样在redis启动加载配置文件后,会设置server.masterhost等信息,同时会设置server.repl_state = REDIS_REPL_CONNECT; 这样redis会在serverCrond定时任务的后面会隔一秒调用replicationCron函数,从而开始跟master的连接;

第二种方式为启动后用上面一样的指令设置master信息,这格式化会中断跟之前的master的信息,重新跟新的master建立连接,重新SYNC数据。处理函数为:slaveofCommand。

1void slaveofCommand(redisClient *c) {
2    if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) {
3        if (server.masterhost) {//已经是个slave了,需要关闭之
4//·····
5        }
6    else {
7        long port;
8        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
9            return;
10        /* Check if we are already attached to the specified slave */
11        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
12            && server.masterport == port) {
13            redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
14            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
15            return;
16        }
17        /* There was no previous master or the user specified a different one,
18         * we can continue. */
19        sdsfree(server.masterhost);
20        server.masterhost = sdsdup(c->argv[1]->ptr);
21        server.masterport = port;
22        if (server.master) freeClient(server.master);//直接关闭之前的master连接,readSyncBulkPayload接收完RDB文件会设置这个的。
23 
24        disconnectSlaves(); /* Force our slaves to resync with us as well. */
25        cancelReplicationHandshake();
26 
27    //下面设置这个的状态为需要连接master, 这样在serverCron定时任务会每秒调用replicationCron,进而会调用connectWithMaster进行重连的。
28        server.repl_state = REDIS_REPL_CONNECT;
29        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport);
30    }
31    addReply(c,shared.ok);
32}

这里当redis收到slaveof命令后,会中断跟目前的master建立的连接,然后会调用disconnectSlaves中断我自己的下一级slave,因为redis支持树形slave机制,类似mysql。

redis主从支持树形结构,所以这里需要先断开跟本slave的slave们的连接,让他们重连.这里需要关注重连后,新的数据如何同步的问题,比如我拿到RDB文件后,我需要将其复制一份给我的从库们. 实现的方式是让从库们重新发起sync指令,当然此时估计他们sync后的数据为空。

这里有个疑问,如果是树形的架构,正在同步数据的从库连接被断开,1秒后重新尝试连接,然后重新发送PING,SYNC,同步RDB文件,又重新建立了连接,这样是不是就悲剧了 ?

  1. 如果我自己slave-serve-stale-data 设置为off了,那么此时断开连接的我的二级slave们给我发送PING,SYNC指令的时候,我是不会处理的,只有info,slaveof等命令会处理,这样我的slave们无法同步成功,会因为我拒绝而在syncWithMaster里面因为阻塞读取一行”+PONG\r\n”失败而失败,再次进入REDIS_REPL_CONNECT状态尝试跟我建立连接。直到我的状态切换为REDIS_REPL_CONNECTED为止 。这种情况下没啥大问题顶多slave无法服务而已。
  2. 如果我自己的slave-serve-stale-data设置为on了,也就是我在没有跟master同步完RDB文件的过程中,还可以接受各种命令的,这个在processCommand里面检测的。那么也就可以接受下面我断开的这些slave的重连请求,包括PING,SYNC ! 这样他们又要求我做RDB快照,而且我真的去做快照,做完还发送给他们是不是悲剧了? 问题在于我还没有跟我的master同步完RDB数据的时候,我是否应该叫我的slave们立即跟我重新同步。这种情况是不是就悲剧了.额,不对,刚才测试了一下,这种情况不会发生,因为syncCommand函数开头检查了一下我自己的状态是不是在REDIS_REPL_CONNECTED,不在的话我是不会接收SYNC命令的。所以我的slave们不能立即SYNC成功,直到我自己的同步搞定了为止。否则收到”Can’t SYNC while not connected with my master”而一直报错。

上面slaveofCommand关键的代码是这一行:server.repl_state = REDIS_REPL_CONNECT;设置这个标志后replicationCron会每秒检查server.repl_state的状态进行相应的操作。如果是REDIS_REPL_CONNECT,就会调用connectWithMaster去异步连接master.

1void replicationCron(void) {
2//serverCron调用这里,注意主库和从库都可能调用这里的。
3//`````
4    /* Check if we should connect to a MASTER */
5    if (server.repl_state == REDIS_REPL_CONNECT) {
6        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
7        if (connectWithMaster() == REDIS_OK) {
8            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
9        }
10    }
11//·····

来看看connectWithMaster的代码,挺简单的,就进行非阻塞的连接,设置连接的读写事件为syncWithMaster, 服务器状态server.repl_state 为 REDIS_REPL_CONNECTING。 这样如果连接成功,会调用syncWithMaster函数。

1int connectWithMaster(void) {
2    int fd;
3//非阻塞建立连接
4    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
5    if (fd == -1) {
6        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
7            strerror(errno));
8        return REDIS_ERR;
9    }
10//绑定读写事件都为syncWithMaster
11    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) {
12        close(fd);
13        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
14        return REDIS_ERR;
15    }
16 
17    server.repl_transfer_lastio = server.unixtime;
18    server.repl_transfer_s = fd;
19    server.repl_state = REDIS_REPL_CONNECTING;
20    return REDIS_OK;
21}

继续走syncWithMaster,syncWithMaster其实是个状态机,从发送PING,发送SYNC,等待结果,一个个处理。

一、发送PING消息

发送PING消息为了简单,redis是调用syncWrite同步阻塞发送的,发送完后将server.repl_state 设置为 REDIS_REPL_RECEIVE_PONG;也就是等待PING的状态。下次连接可读、写的时候会调用本函数去读取PING的结果。

1void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
2//`````
3    /* If we were connecting, it's time to send a non blocking PING, we want to
4     * make sure the master is able to reply before going into the actual
5     * replication process where we have long timeouts in the order of
6     * seconds (in the meantime the slave would block). */
7//如果之前正在REDIS_REPL_CONNECTING,现在有可读可写事件了,说明连接成功了,下一步就是需要发送PING请求
8    if (server.repl_state == REDIS_REPL_CONNECTING) {
9        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
10        /* Delete the writable event so that the readable event remains
11         * registered and we can wait for the PONG reply. */
12        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
13        server.repl_state = REDIS_REPL_RECEIVE_PONG;//设置为PING命令已经发送完毕
14        /* Send the PING, don't check for errors at all, we have the timeout
15         * that will take care about this. */
16        syncWrite(fd,"PING\r\n",6,100);//同步阻塞发送PING命令,这样服务端会返回"+PONG\r\n"字符串的
17        //这里实现的比较简单,就是发送完PING后,当连接可读可写时,再进syncWithMaster这个函数的时候
18        //下面的代码会判断PONG这个动作,然后就会阻塞去读取一行回复,看他是不是成功了。
19        return;
20    }
21    /* Receive the PONG command. */
22    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
23        char buf[1024];
24//上面在REDIS_REPL_CONNECTING状态的时候给master发送了一行PING指令,这样master会返回"+PONG\r\n"的,
25//现在连接可读或者可写了,所以我们阻塞读取这么多数据,判断返回是否OK。
26        /* Delete the readable event, we no longer need it now that there is
27         * the PING reply to read. */
28        aeDeleteFileEvent(server.el,fd,AE_READABLE);
29 
30        /* Read the reply with explicit timeout. */
31        buf[0] = '\0';
32        if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) {
33            redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s",strerror(errno));
34            goto error;
35        }
36//·····

后面如果需要进行AUTH验证,就会给服务器发送AUTH指令验证身份:sendSynchronousCommand(fd,”AUTH”,server.masterauth,NULL);

二、发送SYNC请求RDB 快照

这个通过syncWrite发送一条SYNC指令过去,然后准备一个临时文件打开接收数据,将连接的可读事件设置为readSyncBulkPayload就行了。然后守卫工作将server.repl_state 这个小状态机设置为REDIS_REPL_TRANSFER,也就是准备接收RDB文件过程中。

1//到这里的话,连接肯定成功了,而且PING指令也都收到了回复。所以果断发送SYNC指令
2    /* Issue the SYNC command */
3    if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
4        redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5            strerror(errno));
6        goto error;
7    }
8 
9    /* Prepare a suitable temp file for bulk transfer */
10    while(maxtries--) {
11        snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(longint)getpid());
12        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
13        if (dfd != -1) break;
14        sleep(1);
15    }
16    if (dfd == -1) {
17        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
18        goto error;
19    }
20//SYNC命令已经发送了,以后的可读可写事件就依靠readSyncBulkPayload来读取解析了。
21    /* Setup the non blocking download of the bulk file. */
22    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
23            == AE_ERR)
24    {
25        redisLog(REDIS_WARNING,
26            "Can't create readable event for SYNC: %s (fd=%d)",
27            strerror(errno),fd);
28        goto error;
29    }
30 
31    server.repl_state = REDIS_REPL_TRANSFER;//进入数据传输阶段。

三、接收RDB文件

上面看到了,发送SYNC指令后,跟master的连接的可读事件设置为readSyncBulkPayload了,函数读取master发过来的RDB大小以及文件内容保存到本地文件中,如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly。这个读取是异步的,所以如果需要,这个过程中redis还是可以处理请求。当然slave-serve-stale-data 得设置为on才行。

先读取RDB文件总大小:

1void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
2//读取master发过来的RDB大小以及文件内容保存到本地文件中;
3//如果读取完毕,那么调用rdbLoad加载文件内容。并考虑重新启动startAppendOnly
4    if (server.repl_transfer_size == -1) {
5        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
6//`````
7        //先获取RDB文件大小。
8        server.repl_transfer_size = strtol(buf+1,NULL,10);

然后就可以读取RDB文件内容了,其实就是一堆指令。注意redis为了避免阻塞,每次可读回调只读取16K的数据,然后写入RDB临时文件里面,写到一定大小,默认写死为REPL_MAX_WRITTEN_BEFORE_FSYNC 也就是8M,就进行一次刷磁盘的操作sync();避免到最后一次SYNC的时候直接卡死服务器。

1/* Read bulk data */
2left = server.repl_transfer_size - server.repl_transfer_read;
3readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
4nread = read(fd,buf,readlen);//读取一次,有且仅有的读取一次。每次可读事件就读一次。
5if (nread <= 0) {
6    redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
7        (nread == -1) ? strerror(errno) : "connection lost");
8    replicationAbortSyncTransfer();
9    return;
10}
11server.repl_transfer_lastio = server.unixtime;
12if (write(server.repl_transfer_fd,buf,nread) != nread) {//写到临时文件里面去。
13    redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s"strerror(errno));
14    goto error;
15}
16server.repl_transfer_read += nread;//更新读了的数目

如果读取完成了,那么就可以加载数据了。

四、rdbLoad()重新加载新的RDB文件

如果文件全部接收完毕,redis会先清空所有数据emptyDb,然后用rdbLoad加载RDB文件到内存中。设置连接为CONNECTED状态

1/* Check if the transfer is now complete */
2if (server.repl_transfer_read == server.repl_transfer_size) {//看看是否文件全部接收完毕,如果完毕,GOOD
3    if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
4        redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s"strerror(errno));
5        replicationAbortSyncTransfer();
6        return;
7    }
8    redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
9    signalFlushedDb(-1);
10    emptyDb();
11    /* Before loading the DB into memory we need to delete the readable
12     * handler, otherwise it will get called recursively since
13     * rdbLoad() will call the event loop to process events from time to
14     * time for non blocking loading. */
15    aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);//上面注释说了,避免循环进入。
16    //开始加载RDB文件到内存数据结构中,这个要花费不少时间的。
17    if (rdbLoad(server.rdb_filename) != REDIS_OK) {
18        redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
19        replicationAbortSyncTransfer();
20        return;
21    }
22    /* Final setup of the connected slave <- master link */
23    zfree(server.repl_transfer_tmpfile);
24    close(server.repl_transfer_fd);
25    server.master = createClient(server.repl_transfer_s);//重新注册可读事件毁掉为readQueryFromClient
26    server.master->flags |= REDIS_MASTER;
27    server.master->authenticated = 1;
28    server.repl_state = REDIS_REPL_CONNECTED;
29    redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");

当切换server.repl_state 为 REDIS_REPL_CONNECTED的时候,新来的查询请求就能够被处理了,在processCommand里面就不会过滤非STALE请求,同时本slave也能接受下一级slave的SYNC指令了。

后面redis会附带启动AOF,如果需要的话。

五、总结

redis主从同步代码比较简练,不多,但功能该有的都有,很赞的。下面说点缺点:

  1. 不能支持增量同步(这个即将发布的2.8版本已经解决,采用backlog的形式);
  2. 如果系统很大,好几十G的RDB文件,靠一个连接发送RDB文件的话估计得把人耗死,而且更悲剧的问题是:在做RDB快照,以及发送RDB问的过程中,所有客户端的写操作都会记录在内存中,这个对本来内存要求高的redis又增加了负担;
  3. 另外redis的扩容是个问题,那么大的数据量,加载一次RDB文件得好几个小时,简直无法忍受。

不过关于扩容作者在其博客 里面介绍了可用的方法:presharding。不多说,绝对经典。

另外redis的集群化正在开发中,可用在这里看到redis集群化的进度和概况:Redis cluster Specification (work in progress)代码里面也有最新的相关实现了,过段时间看看。