/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
//其他代码.....
//配置状态机为 REPL_STATE_CONNECT
server.repl_state = REPL_STATE_CONNECT;
}
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
/* 1000ms执行一次 replicationCron这个函数 */
run_with_period(1000) replicationCron();
replicationCron(void) {
//.....
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
//....
}
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
//读取回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
syncCommand(client *c) {
// 一系列的判断代码、略过
//因为我们是第一次同步、所以 尝试部分同步会失败、 走到下面的 else 里 stat_sync_partial_err++
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
//以主机的视角来看的话,这里很多代码是做一些 数据保存、主要把从机的信息保存下来、
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
//标识当前这个从机的同步状态、标识从机为CLIENT_SLAVE身份、加入从机列表、
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;//这个打个flag ,下面将会用到。先标识状态
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
//这里创建一个复制积压缓冲区,用于部分同步,稍后讲到、这里打个flag
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}
//下面有三个case 因为我们是第一次请求主机同步。所以没有任务bgsave progress(这里假设,方便我们阅读代码,和顺应场景)
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (server.aof_child_pid == -1) {
//开始为同步进行Bgsave操作
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}
replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
//这个函数很简单了、配置从机和复制偏移量、配置从机的复制状态、
slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
//写入数据、 FULLRESYNC replid offset
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
_result = slaveTryPartialResynchronization(fd,1);
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
return;
}
slaveTryPartialResynchronization(int fd, int read_reply) {
/* Reading half */
//读取数据
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
aeDeleteFileEvent(server.el,fd,AE_READABLE);
//如果是FULLRESYNC
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
//记录返回来的replid 和 offset
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
//清空已经存在的主机信息
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
}
syncCommand(client *c) {
//主机在解析从机发来 PSYNC 命令时、
//当有一个从机的时候、并且 back_log为null的时候
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
//创建 backlogBuffer
createReplicationBacklog();
}
}
//创建复制积压缓冲区
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
//配置文件配置大小、defult 1M
server.repl_backlog = zmalloc(server.repl_backlog_size);
//实际数据长度
server.repl_backlog_histlen = 0;
//下一次写入命令的的位置
server.repl_backlog_idx = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
}
syncCommand(client *c) {
//其他代码...
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 在这里、这个函数看函数名!
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
}
//其他代码...
}
int masterTryPartialResynchronization(client *c) {
//其他代码...
/* We still have the data our slave is asking for? */
//这个翻译就是 。我们有从机要的数据!。。。。意思就是我有备份、不用生成RDB!
//backlogBuffer不存在
//或者、传递过来的偏移量 < 当前主机的偏移量 (意思是主机跑的太快了、覆盖了一些数据)
//或者是当前从机比主机跑的还数据还要多
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
//否则就走全量同步
goto need_full_resync;
}
//其他代码...
//发送在缓冲区的数据
psync_len = addReplyReplicationBacklog(c,psync_offset);
}
masterTryPartialResynchronization(client *c) {
//其他代码
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
//我们说了,当6380被选为主机的时候,备份了原来的 replid 和 offset! 这一点至关重要
if (strcasecmp(master_replid, server.replid) &&
// 因为这里支持了对 replid2的判断,即我还记得你,你之前是我爸爸,但是,你不能比我跑的快、
//不然不好意思、去全量同步我的!
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
}