Redis 复制

在 Redis 中,用户可以通过执行 SLAVEOF 命令或者设置 slaveof 选项,让一个服务器去复制(replicate)另一个服务器,我们称呼被复制的服务器为主服务器(master),而对主服务器进行复制的服务器则被称为从服务器(slave)。

REDIS_REPL_CONNECT

// redis.h
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
#define REDIS_REPL_RECEIVE_PONG 3 /* Wait for PING reply */
#define REDIS_REPL_TRANSFER 4 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 5 /* Connected to master */

// redis.c
struct redisCommand redisCommandTable[] = {
...
{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
...
};

// replication.c
void slaveofCommand(redisClient *c) {
...
// SLAVEOF NO ONE 让从服务器转为主服务器
if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) {
...
} else {
...
// 开始执行复制操作
replicationSetMaster(c->argv[1]->ptr, port);
...
}
addReply(c,shared.ok);
}

// replication.c
void replicationSetMaster(char *ip, int port) {
// 设置主服务器 IP 和端口
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
...
// 进入连接状态
server.repl_state = REDIS_REPL_CONNECT;
...
}

server.repl_state 的默认值是 REDIS_REPL_NONE ,执行 SLAVEOF 命令之后,进入 REDIS_REPL_CONNECT 状态,到这里命令就返回了。

REDIS_REPL_CONNECTING

// redis.c
int main(int argc, char **argv) {
...
// 创建并初始化服务器数据结构
initServer();
...
}

void initServer() {
...
// 为 serverCron() 创建时间事件
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
...
}
...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
// 复制处理函数
run_with_period(1000) replicationCron();
...
return 1000/server.hz;
}

先来看看 Redis 的入口函数,在其中调用了initServer() 方法,这个方法里面将 serverCron() 方法注册为一个时间事件的回调函数,即 1 毫秒之后就会触发执行,之后通过 1000/server.hz 这个返回值控制 1 秒运行 server.hz 次,原理就是在事件处理函数 processTimeEvents() 中会把这个返回值设置为下次触发距离现在的毫秒数,而宏 run_with_period 控制 replicationCron() 函数 1 秒运行一次。该函数中判断如果是 REDIS_REPL_CONNECT 状态,则与主服务器建立连接并注册文件事件回调函数 syncWithMaster ,最后进入 REDIS_REPL_CONNECTING 状态。

// replication.c
void replicationCron(void) {
...
if (server.repl_state == REDIS_REPL_CONNECT) {
...
if (connectWithMaster() == REDIS_OK) {
...
}
}
...
}

int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
// 监听主服务器 fd 的读和写事件,并绑定文件事件处理器
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) {
...
}
...
// 将状态改为已连接
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}

REDIS_REPL_RECEIVE_PONG

站在从服务器的角度,与服务器建立连接成功后,会触发可写事件调用回调函数 syncWithMaster() ,因为接下来的 RDB 文件发送非常耗时,所以需要确认主服务器真的能访问,首先发送 PING 命令,主服务器响应 PONG 之后,触发可读事件再次调用该函数,读取响应确认连接没有问题。

然后从服务器调用 slaveTryPartialResynchronization() 函数,如果是首次同步则向主服务器发送 PSYNC ? -1 进行完整同步,非首次同步则发送 PSYNC <runid> <offset> 试图进行部分同步。然后回到 syncWithMaster() 函数,判断如果返回结果不是 PSYNC_CONTINUE 则表示要进行完整同步:打开一个临时文件用于保存主服务传过来的 RDB 文件数据,并设置可读回调函数 readSyncBulkPayload() 准备读取主服务器发送过来的 RDB 文件数据,进入 REDIS_REPL_TRANSFER 状态。

// replication.c
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
...
if (server.repl_state == REDIS_REPL_CONNECTING) {
...
// 暂时取消监听写事件
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
// 更新状态
server.repl_state = REDIS_REPL_RECEIVE_PONG;
// 同步发送 PING
syncWrite(fd,"PING\r\n",6,100);
return;
}
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
// 暂时取消监听读事件
aeDeleteFileEvent(server.el,fd,AE_READABLE);
buf[0] = '\0';
// 同步接收 PONG
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) {
...
goto error;
}
...
}
...
// 根据返回的结果决定是执行部分 resync 还是 full-resync
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
...
return;
}
...
// 打开一个临时文件来保存主服务器的 RDB 文件数据
while(maxtries--) {
snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
...
// 设置一个读事件处理器来读取主服务器的 RDB 文件
if (aeCreateFileEvent(server.el,fd,AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) {
...
goto error;
}
// 更新状态
server.repl_state = REDIS_REPL_TRANSFER;
...
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}

REDIS_REPL_TRANSFER

readSyncBulkPayload() 方法主要用来读取主服务器发送过来的 RDB 文件数据,并定期刷写到磁盘临时文件中,等接收完毕之后重命名为 dump.rdb,首先把数据库清空,然后载入新的 RDB 文件,最后将主服务器设置成自己的一个客户端,准备接收来自主服务器的命令传播,并更新状态为 REDIS_REPL_CONNECTED

// replication.c
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
...
// 检查 RDB 是否已经传送完毕
if (server.repl_transfer_read == server.repl_transfer_size) {
// 临时文件改名为 dump.rdb
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
...
return;
}
// 先清空旧数据库
signalFlushedDb(-1);
emptyDb(replicationEmptyDbCallback);
...
// 再载入 RDB 文件
if (rdbLoad(server.rdb_filename) != REDIS_OK) {
...
return;
}
...
// 将主服务器设置成从服务器的客户端,准备接受命令传播
server.master = createClient(server.repl_transfer_s);
...
// 更新状态
server.repl_state = REDIS_REPL_CONNECTED;
...
}
...
}

再来看主服务器是怎样响应 PSYNC 命令的,syncCommand 就是 SYNCPSYNC 命令的实现函数:如果当前有 BGSAVE 在执行且有至少一个 slave 在等待,如果没有 BGSAVE 在执行则开始一个新的,主服务器在这两种情况下都会进入 REDIS_REPL_WAIT_BGSAVE_END 状态;如果本次 BGSAVE 不能用,则进入 REDIS_REPL_WAIT_BGSAVE_START 状态。

// replication.c
void syncCommand(redisClient *c) {
...
// 检查是否有 BGSAVE 在执行
if (server.rdb_child_pid != -1) {
...
if (ln) {
...
// 如果有至少一个 slave 在等待这个 BGSAVE 完成
// 那么说明正在进行的 BGSAVE 所产生的 RDB 也可以为其他 slave 所用
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
...
} else {
// 不好运的情况,必须等待下个 BGSAVE
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
...
}
} else {
...
// 没有 BGSAVE 在进行,开始一个新的 BGSAVE
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
...
}
// 设置状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
...
}
...
}

在定时任务 serverCron() 中,如果检测到 BGSAVE 执行完毕,则可以进入到 updateSlavesWaitingBgsave() 函数:如果在 REDIS_REPL_WAIT_BGSAVE_END 状态下则开始读取并发送 RDB 文件给 slave,REDIS_REPL_WAIT_BGSAVE_START 状态下则需要执行一个新的 BGSAVE ,完成后同样回到该函数发送 RDB 文件,发送完成后主服务器进入 REDIS_REPL_ONLINE 状态。

// redis.c
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
...
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
...
// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
}
...
}
...
}
...
}

// rdb.c
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
...
// 处理正在等待 BGSAVE 完成的那些 slave
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}

// replication.c
void updateSlavesWaitingBgsave(int bgsaveerr) {
...
// 遍历所有 slave
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
// 之前的 RDB 文件不能使用,开始新的 BGSAVE
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
...
// 打开 RDB 文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) {
...
}
...
// 更新状态
slave->replstate = REDIS_REPL_SEND_BULK;
...
// 注册 slave 的写事件处理器,用于发送 RDB 文件
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave,slave) == AE_ERR) {
...
}
}
}

// 需要执行新的 BGSAVE
if (startbgsave) {
...
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
...
}
}
}

// replication.c
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
...
// 如果写入已经完成
if (slave->repldboff == slave->repldbsize) {
...
// 更新状态
slave->replstate = REDIS_REPL_ONLINE;
...
}
...
}

REDIS_REPL_CONNECTED

在完整同步执行完成之后,主从服务器的数据库将达到一致的状态,但当主服务器执行新的写命令后,可能会导致主从服务器状态不再一致。为了让主从状态再次回到一致状态,主服务器需要对从服务器进行命令传播操作:主服务器将自己执行的写命令,发送给从服务器执行。在 processCommand() 函数中用调用 call() 函数执行命令,如果是一个写命令,会调用 propagate() 函数进行命令传播,具体的实现在 replicationFeedSlaves() 函数中:构建命令内容、备份到 backlog、将内容发送给各个从服务器。

// redis.c
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) {
...
// 传播到 slave
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

// replication.c
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
...
if (server.repl_backlog) {
// 将命令写入到 backlog
}
listRewind(slaves,&li);
while((ln = listNext(&li))) {
// 指向从服务器
redisClient *slave = ln->value;

// 不要给正在等待 BGSAVE 开始的从服务器发送命令
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

// 向从服务器写入命令
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}

当从服务器在断线重连后,向主服务器发送 PSYNC <runid> <offset> 命令,主服务器在 masterTryPartialResynchronization() 函数中判断是否可以进行增量同步:如果 backlog 中包含所有需要同步的命令,主服务器返回 +CONTINUE 表示可以进行增量同步,随后将 backlog 中的命令同步给从服务器,否则返回 +FULLRESYNC <runid> <offset> 进行完整同步。

// replication.c
void syncCommand(redisClient *c) {
...
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 尝试进行 PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
...
}
}
...
}
0%