1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "server.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 void putSlaveOnline(client *slave); 44 int cancelReplicationHandshake(void); 45 46 /* --------------------------- Utility functions ---------------------------- */ 47 48 /* Return the pointer to a string representing the slave ip:listening_port 49 * pair. Mostly useful for logging, since we want to log a slave using its 50 * IP address and it's listening port which is more clear for the user, for 51 * example: "Closing connection with slave 10.1.2.3:6380". */ 52 char *replicationGetSlaveName(client *c) { 53 static char buf[NET_PEER_ID_LEN]; 54 char ip[NET_IP_STR_LEN]; 55 56 ip[0] = '\0'; 57 buf[0] = '\0'; 58 if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { 59 if (c->slave_listening_port) 60 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); 61 else 62 snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip); 63 } else { 64 snprintf(buf,sizeof(buf),"client id #%llu", 65 (unsigned long long) c->id); 66 } 67 return buf; 68 } 69 70 /* ---------------------------------- MASTER -------------------------------- */ 71 72 void createReplicationBacklog(void) { 73 serverAssert(server.repl_backlog == NULL); 74 server.repl_backlog = zmalloc(server.repl_backlog_size); 75 server.repl_backlog_histlen = 0; 76 server.repl_backlog_idx = 0; 77 /* When a new backlog buffer is created, we increment the replication 78 * offset by one to make sure we'll not be able to PSYNC with any 79 * previous slave. This is needed because we avoid incrementing the 80 * master_repl_offset if no backlog exists nor slaves are attached. */ 81 server.master_repl_offset++; 82 83 /* We don't have any data inside our buffer, but virtually the first 84 * byte we have is the next byte that will be generated for the 85 * replication stream. */ 86 server.repl_backlog_off = server.master_repl_offset+1; 87 } 88 89 /* This function is called when the user modifies the replication backlog 90 * size at runtime. It is up to the function to both update the 91 * server.repl_backlog_size and to resize the buffer and setup it so that 92 * it contains the same data as the previous one (possibly less data, but 93 * the most recent bytes, or the same data and more free space in case the 94 * buffer is enlarged). */ 95 void resizeReplicationBacklog(long long newsize) { 96 if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) 97 newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; 98 if (server.repl_backlog_size == newsize) return; 99 100 server.repl_backlog_size = newsize; 101 if (server.repl_backlog != NULL) { 102 /* What we actually do is to flush the old buffer and realloc a new 103 * empty one. It will refill with new data incrementally. 104 * The reason is that copying a few gigabytes adds latency and even 105 * worse often we need to alloc additional space before freeing the 106 * old buffer. */ 107 zfree(server.repl_backlog); 108 server.repl_backlog = zmalloc(server.repl_backlog_size); 109 server.repl_backlog_histlen = 0; 110 server.repl_backlog_idx = 0; 111 /* Next byte we have is... the next since the buffer is empty. */ 112 server.repl_backlog_off = server.master_repl_offset+1; 113 } 114 } 115 116 void freeReplicationBacklog(void) { 117 serverAssert(listLength(server.slaves) == 0); 118 zfree(server.repl_backlog); 119 server.repl_backlog = NULL; 120 } 121 122 /* Add data to the replication backlog. 123 * This function also increments the global replication offset stored at 124 * server.master_repl_offset, because there is no case where we want to feed 125 * the backlog without incrementing the buffer. */ 126 void feedReplicationBacklog(void *ptr, size_t len) { 127 unsigned char *p = ptr; 128 129 server.master_repl_offset += len; 130 131 /* This is a circular buffer, so write as much data we can at every 132 * iteration and rewind the "idx" index if we reach the limit. */ 133 while(len) { 134 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 135 if (thislen > len) thislen = len; 136 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 137 server.repl_backlog_idx += thislen; 138 if (server.repl_backlog_idx == server.repl_backlog_size) 139 server.repl_backlog_idx = 0; 140 len -= thislen; 141 p += thislen; 142 server.repl_backlog_histlen += thislen; 143 } 144 if (server.repl_backlog_histlen > server.repl_backlog_size) 145 server.repl_backlog_histlen = server.repl_backlog_size; 146 /* Set the offset of the first byte we have in the backlog. */ 147 server.repl_backlog_off = server.master_repl_offset - 148 server.repl_backlog_histlen + 1; 149 } 150 151 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 152 * as input. */ 153 void feedReplicationBacklogWithObject(robj *o) { 154 char llstr[LONG_STR_SIZE]; 155 void *p; 156 size_t len; 157 158 if (o->encoding == OBJ_ENCODING_INT) { 159 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 160 p = llstr; 161 } else { 162 len = sdslen(o->ptr); 163 p = o->ptr; 164 } 165 feedReplicationBacklog(p,len); 166 } 167 168 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 169 listNode *ln; 170 listIter li; 171 int j, len; 172 char llstr[LONG_STR_SIZE]; 173 174 /* If there aren't slaves, and there is no backlog buffer to populate, 175 * we can return ASAP. */ 176 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 177 178 /* We can't have slaves attached and no backlog. */ 179 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 180 181 /* Send SELECT command to every slave if needed. */ 182 if (server.slaveseldb != dictid) { 183 robj *selectcmd; 184 185 /* For a few DBs we have pre-computed SELECT command. */ 186 if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { 187 selectcmd = shared.select[dictid]; 188 } else { 189 int dictid_len; 190 191 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 192 selectcmd = createObject(OBJ_STRING, 193 sdscatprintf(sdsempty(), 194 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 195 dictid_len, llstr)); 196 } 197 198 /* Add the SELECT command into the backlog. */ 199 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 200 201 /* Send it to slaves. */ 202 listRewind(slaves,&li); 203 while((ln = listNext(&li))) { 204 client *slave = ln->value; 205 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 206 addReply(slave,selectcmd); 207 } 208 209 if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) 210 decrRefCount(selectcmd); 211 } 212 server.slaveseldb = dictid; 213 214 /* Write the command to the replication backlog if any. */ 215 if (server.repl_backlog) { 216 char aux[LONG_STR_SIZE+3]; 217 218 /* Add the multi bulk reply length. */ 219 aux[0] = '*'; 220 len = ll2string(aux+1,sizeof(aux)-1,argc); 221 aux[len+1] = '\r'; 222 aux[len+2] = '\n'; 223 feedReplicationBacklog(aux,len+3); 224 225 for (j = 0; j < argc; j++) { 226 long objlen = stringObjectLen(argv[j]); 227 228 /* We need to feed the buffer with the object as a bulk reply 229 * not just as a plain string, so create the $..CRLF payload len 230 * and add the final CRLF */ 231 aux[0] = '$'; 232 len = ll2string(aux+1,sizeof(aux)-1,objlen); 233 aux[len+1] = '\r'; 234 aux[len+2] = '\n'; 235 feedReplicationBacklog(aux,len+3); 236 feedReplicationBacklogWithObject(argv[j]); 237 feedReplicationBacklog(aux+len+1,2); 238 } 239 } 240 241 /* Write the command to every slave. */ 242 listRewind(server.slaves,&li); 243 while((ln = listNext(&li))) { 244 client *slave = ln->value; 245 246 /* Don't feed slaves that are still waiting for BGSAVE to start */ 247 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 248 249 /* Feed slaves that are waiting for the initial SYNC (so these commands 250 * are queued in the output buffer until the initial SYNC completes), 251 * or are already in sync with the master. */ 252 253 /* Add the multi bulk length. */ 254 addReplyMultiBulkLen(slave,argc); 255 256 /* Finally any additional argument that was not stored inside the 257 * static buffer if any (from j to argc). */ 258 for (j = 0; j < argc; j++) 259 addReplyBulk(slave,argv[j]); 260 } 261 } 262 263 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { 264 listNode *ln; 265 listIter li; 266 int j; 267 sds cmdrepr = sdsnew("+"); 268 robj *cmdobj; 269 struct timeval tv; 270 271 gettimeofday(&tv,NULL); 272 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 273 if (c->flags & CLIENT_LUA) { 274 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 275 } else if (c->flags & CLIENT_UNIX_SOCKET) { 276 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 277 } else { 278 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 279 } 280 281 for (j = 0; j < argc; j++) { 282 if (argv[j]->encoding == OBJ_ENCODING_INT) { 283 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 284 } else { 285 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 286 sdslen(argv[j]->ptr)); 287 } 288 if (j != argc-1) 289 cmdrepr = sdscatlen(cmdrepr," ",1); 290 } 291 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 292 cmdobj = createObject(OBJ_STRING,cmdrepr); 293 294 listRewind(monitors,&li); 295 while((ln = listNext(&li))) { 296 client *monitor = ln->value; 297 addReply(monitor,cmdobj); 298 } 299 decrRefCount(cmdobj); 300 } 301 302 /* Feed the slave 'c' with the replication backlog starting from the 303 * specified 'offset' up to the end of the backlog. */ 304 long long addReplyReplicationBacklog(client *c, long long offset) { 305 long long j, skip, len; 306 307 serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 308 309 if (server.repl_backlog_histlen == 0) { 310 serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); 311 return 0; 312 } 313 314 serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", 315 server.repl_backlog_size); 316 serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", 317 server.repl_backlog_off); 318 serverLog(LL_DEBUG, "[PSYNC] History len: %lld", 319 server.repl_backlog_histlen); 320 serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", 321 server.repl_backlog_idx); 322 323 /* Compute the amount of bytes we need to discard. */ 324 skip = offset - server.repl_backlog_off; 325 serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); 326 327 /* Point j to the oldest byte, that is actaully our 328 * server.repl_backlog_off byte. */ 329 j = (server.repl_backlog_idx + 330 (server.repl_backlog_size-server.repl_backlog_histlen)) % 331 server.repl_backlog_size; 332 serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); 333 334 /* Discard the amount of data to seek to the specified 'offset'. */ 335 j = (j + skip) % server.repl_backlog_size; 336 337 /* Feed slave with data. Since it is a circular buffer we have to 338 * split the reply in two parts if we are cross-boundary. */ 339 len = server.repl_backlog_histlen - skip; 340 serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); 341 while(len) { 342 long long thislen = 343 ((server.repl_backlog_size - j) < len) ? 344 (server.repl_backlog_size - j) : len; 345 346 serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 347 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 348 len -= thislen; 349 j = 0; 350 } 351 return server.repl_backlog_histlen - skip; 352 } 353 354 /* Return the offset to provide as reply to the PSYNC command received 355 * from the slave. The returned value is only valid immediately after 356 * the BGSAVE process started and before executing any other command 357 * from clients. */ 358 long long getPsyncInitialOffset(void) { 359 long long psync_offset = server.master_repl_offset; 360 /* Add 1 to psync_offset if it the replication backlog does not exists 361 * as when it will be created later we'll increment the offset by one. */ 362 if (server.repl_backlog == NULL) psync_offset++; 363 return psync_offset; 364 } 365 366 /* Send a FULLRESYNC reply in the specific case of a full resynchronization, 367 * as a side effect setup the slave for a full sync in different ways: 368 * 369 * 1) Remember, into the slave client structure, the offset we sent 370 * here, so that if new slaves will later attach to the same 371 * background RDB saving process (by duplicating this client output 372 * buffer), we can get the right offset from this slave. 373 * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that 374 * we start accumulating differences from this point. 375 * 3) Force the replication stream to re-emit a SELECT statement so 376 * the new slave incremental differences will start selecting the 377 * right database number. 378 * 379 * Normally this function should be called immediately after a successful 380 * BGSAVE for replication was started, or when there is one already in 381 * progress that we attached our slave to. */ 382 int replicationSetupSlaveForFullResync(client *slave, long long offset) { 383 char buf[128]; 384 int buflen; 385 386 slave->psync_initial_offset = offset; 387 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; 388 /* We are going to accumulate the incremental changes for this 389 * slave as well. Set slaveseldb to -1 in order to force to re-emit 390 * a SLEECT statement in the replication stream. */ 391 server.slaveseldb = -1; 392 393 /* Don't send this reply to slaves that approached us with 394 * the old SYNC command. */ 395 if (!(slave->flags & CLIENT_PRE_PSYNC)) { 396 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 397 server.runid,offset); 398 if (write(slave->fd,buf,buflen) != buflen) { 399 freeClientAsync(slave); 400 return C_ERR; 401 } 402 } 403 return C_OK; 404 } 405 406 /* This function handles the PSYNC command from the point of view of a 407 * master receiving a request for partial resynchronization. 408 * 409 * On success return C_OK, otherwise C_ERR is returned and we proceed 410 * with the usual full resync. */ 411 int masterTryPartialResynchronization(client *c) { 412 long long psync_offset, psync_len; 413 char *master_runid = c->argv[1]->ptr; 414 char buf[128]; 415 int buflen; 416 417 /* Is the runid of this master the same advertised by the wannabe slave 418 * via PSYNC? If runid changed this master is a different instance and 419 * there is no way to continue. */ 420 if (strcasecmp(master_runid, server.runid)) { 421 /* Run id "?" is used by slaves that want to force a full resync. */ 422 if (master_runid[0] != '?') { 423 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " 424 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 425 master_runid, server.runid); 426 } else { 427 serverLog(LL_NOTICE,"Full resync requested by slave %s", 428 replicationGetSlaveName(c)); 429 } 430 goto need_full_resync; 431 } 432 433 /* We still have the data our slave is asking for? */ 434 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 435 C_OK) goto need_full_resync; 436 if (!server.repl_backlog || 437 psync_offset < server.repl_backlog_off || 438 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 439 { 440 serverLog(LL_NOTICE, 441 "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); 442 if (psync_offset > server.master_repl_offset) { 443 serverLog(LL_WARNING, 444 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 445 } 446 goto need_full_resync; 447 } 448 449 /* If we reached this point, we are able to perform a partial resync: 450 * 1) Set client state to make it a slave. 451 * 2) Inform the client we can continue with +CONTINUE 452 * 3) Send the backlog data (from the offset to the end) to the slave. */ 453 c->flags |= CLIENT_SLAVE; 454 c->replstate = SLAVE_STATE_ONLINE; 455 c->repl_ack_time = server.unixtime; 456 c->repl_put_online_on_ack = 0; 457 listAddNodeTail(server.slaves,c); 458 /* We can't use the connection buffers since they are used to accumulate 459 * new commands at this stage. But we are sure the socket send buffer is 460 * empty so this write will never fail actually. */ 461 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 462 if (write(c->fd,buf,buflen) != buflen) { 463 freeClientAsync(c); 464 return C_OK; 465 } 466 psync_len = addReplyReplicationBacklog(c,psync_offset); 467 serverLog(LL_NOTICE, 468 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 469 replicationGetSlaveName(c), 470 psync_len, psync_offset); 471 /* Note that we don't need to set the selected DB at server.slaveseldb 472 * to -1 to force the master to emit SELECT, since the slave already 473 * has this state from the previous connection with the master. */ 474 475 refreshGoodSlavesCount(); 476 return C_OK; /* The caller can return, no full resync needed. */ 477 478 need_full_resync: 479 /* We need a full resync for some reason... Note that we can't 480 * reply to PSYNC right now if a full SYNC is needed. The reply 481 * must include the master offset at the time the RDB file we transfer 482 * is generated, so we need to delay the reply to that moment. */ 483 return C_ERR; 484 } 485 486 /* Start a BGSAVE for replication goals, which is, selecting the disk or 487 * socket target depending on the configuration, and making sure that 488 * the script cache is flushed before to start. 489 * 490 * The mincapa argument is the bitwise AND among all the slaves capabilities 491 * of the slaves waiting for this BGSAVE, so represents the slave capabilities 492 * all the slaves support. Can be tested via SLAVE_CAPA_* macros. 493 * 494 * Side effects, other than starting a BGSAVE: 495 * 496 * 1) Handle the slaves in WAIT_START state, by preparing them for a full 497 * sync if the BGSAVE was succesfully started, or sending them an error 498 * and dropping them from the list of slaves. 499 * 500 * 2) Flush the Lua scripting script cache if the BGSAVE was actually 501 * started. 502 * 503 * Returns C_OK on success or C_ERR otherwise. */ 504 int startBgsaveForReplication(int mincapa) { 505 int retval; 506 int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); 507 listIter li; 508 listNode *ln; 509 510 serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", 511 socket_target ? "slaves sockets" : "disk"); 512 513 if (socket_target) 514 retval = rdbSaveToSlavesSockets(); 515 else 516 retval = rdbSaveBackground(server.rdb_filename); 517 518 /* If we failed to BGSAVE, remove the slaves waiting for a full 519 * resynchorinization from the list of salves, inform them with 520 * an error about what happened, close the connection ASAP. */ 521 if (retval == C_ERR) { 522 serverLog(LL_WARNING,"BGSAVE for replication failed"); 523 listRewind(server.slaves,&li); 524 while((ln = listNext(&li))) { 525 client *slave = ln->value; 526 527 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 528 slave->flags &= ~CLIENT_SLAVE; 529 listDelNode(server.slaves,ln); 530 addReplyError(slave, 531 "BGSAVE failed, replication can't continue"); 532 slave->flags |= CLIENT_CLOSE_AFTER_REPLY; 533 } 534 } 535 return retval; 536 } 537 538 /* If the target is socket, rdbSaveToSlavesSockets() already setup 539 * the salves for a full resync. Otherwise for disk target do it now.*/ 540 if (!socket_target) { 541 listRewind(server.slaves,&li); 542 while((ln = listNext(&li))) { 543 client *slave = ln->value; 544 545 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 546 replicationSetupSlaveForFullResync(slave, 547 getPsyncInitialOffset()); 548 } 549 } 550 } 551 552 /* Flush the script cache, since we need that slave differences are 553 * accumulated without requiring slaves to match our cached scripts. */ 554 if (retval == C_OK) replicationScriptCacheFlush(); 555 return retval; 556 } 557 558 /* SYNC and PSYNC command implemenation. */ 559 void syncCommand(client *c) { 560 /* ignore SYNC if already slave or in monitor mode */ 561 if (c->flags & CLIENT_SLAVE) return; 562 563 /* Refuse SYNC requests if we are a slave but the link with our master 564 * is not ok... */ 565 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { 566 addReplyError(c,"Can't SYNC while not connected with my master"); 567 return; 568 } 569 570 /* SYNC can't be issued when the server has pending data to send to 571 * the client about already issued commands. We need a fresh reply 572 * buffer registering the differences between the BGSAVE and the current 573 * dataset, so that we can copy to other slaves if needed. */ 574 if (clientHasPendingReplies(c)) { 575 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 576 return; 577 } 578 579 serverLog(LL_NOTICE,"Slave %s asks for synchronization", 580 replicationGetSlaveName(c)); 581 582 /* Try a partial resynchronization if this is a PSYNC command. 583 * If it fails, we continue with usual full resynchronization, however 584 * when this happens masterTryPartialResynchronization() already 585 * replied with: 586 * 587 * +FULLRESYNC <runid> <offset> 588 * 589 * So the slave knows the new runid and offset to try a PSYNC later 590 * if the connection with the master is lost. */ 591 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 592 if (masterTryPartialResynchronization(c) == C_OK) { 593 server.stat_sync_partial_ok++; 594 return; /* No full resync needed, return. */ 595 } else { 596 char *master_runid = c->argv[1]->ptr; 597 598 /* Increment stats for failed PSYNCs, but only if the 599 * runid is not "?", as this is used by slaves to force a full 600 * resync on purpose when they are not albe to partially 601 * resync. */ 602 if (master_runid[0] != '?') server.stat_sync_partial_err++; 603 } 604 } else { 605 /* If a slave uses SYNC, we are dealing with an old implementation 606 * of the replication protocol (like redis-cli --slave). Flag the client 607 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 608 c->flags |= CLIENT_PRE_PSYNC; 609 } 610 611 /* Full resynchronization. */ 612 server.stat_sync_full++; 613 614 /* Setup the slave as one waiting for BGSAVE to start. The following code 615 * paths will change the state if we handle the slave differently. */ 616 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; 617 if (server.repl_disable_tcp_nodelay) 618 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 619 c->repldbfd = -1; 620 c->flags |= CLIENT_SLAVE; 621 listAddNodeTail(server.slaves,c); 622 623 /* CASE 1: BGSAVE is in progress, with disk target. */ 624 if (server.rdb_child_pid != -1 && 625 server.rdb_child_type == RDB_CHILD_TYPE_DISK) 626 { 627 /* Ok a background save is in progress. Let's check if it is a good 628 * one for replication, i.e. if there is another slave that is 629 * registering differences since the server forked to save. */ 630 client *slave; 631 listNode *ln; 632 listIter li; 633 634 listRewind(server.slaves,&li); 635 while((ln = listNext(&li))) { 636 slave = ln->value; 637 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 638 } 639 /* To attach this slave, we check that it has at least all the 640 * capabilities of the slave that triggered the current BGSAVE. */ 641 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 642 /* Perfect, the server is already registering differences for 643 * another slave. Set the right state, and copy the buffer. */ 644 copyClientOutputBuffer(c,slave); 645 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); 646 serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); 647 } else { 648 /* No way, we need to wait for the next BGSAVE in order to 649 * register differences. */ 650 serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC"); 651 } 652 653 /* CASE 2: BGSAVE is in progress, with socket target. */ 654 } else if (server.rdb_child_pid != -1 && 655 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) 656 { 657 /* There is an RDB child process but it is writing directly to 658 * children sockets. We need to wait for the next BGSAVE 659 * in order to synchronize. */ 660 serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); 661 662 /* CASE 3: There is no BGSAVE is progress. */ 663 } else { 664 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { 665 /* Diskless replication RDB child is created inside 666 * replicationCron() since we want to delay its start a 667 * few seconds to wait for more slaves to arrive. */ 668 if (server.repl_diskless_sync_delay) 669 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); 670 } else { 671 /* Target is disk (or the slave is not capable of supporting 672 * diskless replication) and we don't have a BGSAVE in progress, 673 * let's start one. */ 674 if (server.aof_child_pid != -1) { 675 startBgsaveForReplication(c->slave_capa); 676 } else { 677 serverLog(LL_NOTICE, 678 "No BGSAVE in progress, but an AOF rewrite is active. " 679 "BGSAVE for replication delayed"); 680 } 681 } 682 } 683 684 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 685 createReplicationBacklog(); 686 return; 687 } 688 689 /* REPLCONF <option> <value> <option> <value> ... 690 * This command is used by a slave in order to configure the replication 691 * process before starting it with the SYNC command. 692 * 693 * Currently the only use of this command is to communicate to the master 694 * what is the listening port of the Slave redis instance, so that the 695 * master can accurately list slaves and their listening ports in 696 * the INFO output. 697 * 698 * In the future the same command can be used in order to configure 699 * the replication to initiate an incremental replication instead of a 700 * full resync. */ 701 void replconfCommand(client *c) { 702 int j; 703 704 if ((c->argc % 2) == 0) { 705 /* Number of arguments must be odd to make sure that every 706 * option has a corresponding value. */ 707 addReply(c,shared.syntaxerr); 708 return; 709 } 710 711 /* Process every option-value pair. */ 712 for (j = 1; j < c->argc; j+=2) { 713 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 714 long port; 715 716 if ((getLongFromObjectOrReply(c,c->argv[j+1], 717 &port,NULL) != C_OK)) 718 return; 719 c->slave_listening_port = port; 720 } else if (!strcasecmp(c->argv[j]->ptr,"capa")) { 721 /* Ignore capabilities not understood by this master. */ 722 if (!strcasecmp(c->argv[j+1]->ptr,"eof")) 723 c->slave_capa |= SLAVE_CAPA_EOF; 724 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 725 /* REPLCONF ACK is used by slave to inform the master the amount 726 * of replication stream that it processed so far. It is an 727 * internal only command that normal clients should never use. */ 728 long long offset; 729 730 if (!(c->flags & CLIENT_SLAVE)) return; 731 if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) 732 return; 733 if (offset > c->repl_ack_off) 734 c->repl_ack_off = offset; 735 c->repl_ack_time = server.unixtime; 736 /* If this was a diskless replication, we need to really put 737 * the slave online when the first ACK is received (which 738 * confirms slave is online and ready to get more data). */ 739 if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) 740 putSlaveOnline(c); 741 /* Note: this command does not reply anything! */ 742 return; 743 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 744 /* REPLCONF GETACK is used in order to request an ACK ASAP 745 * to the slave. */ 746 if (server.masterhost && server.master) replicationSendAck(); 747 /* Note: this command does not reply anything! */ 748 } else { 749 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 750 (char*)c->argv[j]->ptr); 751 return; 752 } 753 } 754 addReply(c,shared.ok); 755 } 756 757 /* This function puts a slave in the online state, and should be called just 758 * after a slave received the RDB file for the initial synchronization, and 759 * we are finally ready to send the incremental stream of commands. 760 * 761 * It does a few things: 762 * 763 * 1) Put the slave in ONLINE state (useless when the function is called 764 * because state is already ONLINE but repl_put_online_on_ack is true). 765 * 2) Make sure the writable event is re-installed, since calling the SYNC 766 * command disables it, so that we can accumulate output buffer without 767 * sending it to the slave. 768 * 3) Update the count of good slaves. */ 769 void putSlaveOnline(client *slave) { 770 slave->replstate = SLAVE_STATE_ONLINE; 771 slave->repl_put_online_on_ack = 0; 772 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ 773 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 774 sendReplyToClient, slave) == AE_ERR) { 775 serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 776 freeClient(slave); 777 return; 778 } 779 refreshGoodSlavesCount(); 780 serverLog(LL_NOTICE,"Synchronization with slave %s succeeded", 781 replicationGetSlaveName(slave)); 782 } 783 784 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 785 client *slave = privdata; 786 UNUSED(el); 787 UNUSED(mask); 788 char buf[PROTO_IOBUF_LEN]; 789 ssize_t nwritten, buflen; 790 791 /* Before sending the RDB file, we send the preamble as configured by the 792 * replication process. Currently the preamble is just the bulk count of 793 * the file in the form "$<length>\r\n". */ 794 if (slave->replpreamble) { 795 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 796 if (nwritten == -1) { 797 serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s", 798 strerror(errno)); 799 freeClient(slave); 800 return; 801 } 802 server.stat_net_output_bytes += nwritten; 803 sdsrange(slave->replpreamble,nwritten,-1); 804 if (sdslen(slave->replpreamble) == 0) { 805 sdsfree(slave->replpreamble); 806 slave->replpreamble = NULL; 807 /* fall through sending data. */ 808 } else { 809 return; 810 } 811 } 812 813 /* If the preamble was already transfered, send the RDB bulk data. */ 814 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 815 buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); 816 if (buflen <= 0) { 817 serverLog(LL_WARNING,"Read error sending DB to slave: %s", 818 (buflen == 0) ? "premature EOF" : strerror(errno)); 819 freeClient(slave); 820 return; 821 } 822 if ((nwritten = write(fd,buf,buflen)) == -1) { 823 if (errno != EAGAIN) { 824 serverLog(LL_WARNING,"Write error sending DB to slave: %s", 825 strerror(errno)); 826 freeClient(slave); 827 } 828 return; 829 } 830 slave->repldboff += nwritten; 831 server.stat_net_output_bytes += nwritten; 832 if (slave->repldboff == slave->repldbsize) { 833 close(slave->repldbfd); 834 slave->repldbfd = -1; 835 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 836 putSlaveOnline(slave); 837 } 838 } 839 840 /* This function is called at the end of every background saving, 841 * or when the replication RDB transfer strategy is modified from 842 * disk to socket or the other way around. 843 * 844 * The goal of this function is to handle slaves waiting for a successful 845 * background saving in order to perform non-blocking synchronization, and 846 * to schedule a new BGSAVE if there are slaves that attached while a 847 * BGSAVE was in progress, but it was not a good one for replication (no 848 * other slave was accumulating differences). 849 * 850 * The argument bgsaveerr is C_OK if the background saving succeeded 851 * otherwise C_ERR is passed to the function. 852 * The 'type' argument is the type of the child that terminated 853 * (if it had a disk or socket target). */ 854 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 855 listNode *ln; 856 int startbgsave = 0; 857 int mincapa = -1; 858 listIter li; 859 860 listRewind(server.slaves,&li); 861 while((ln = listNext(&li))) { 862 client *slave = ln->value; 863 864 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 865 startbgsave = 1; 866 mincapa = (mincapa == -1) ? slave->slave_capa : 867 (mincapa & slave->slave_capa); 868 } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { 869 struct redis_stat buf; 870 871 /* If this was an RDB on disk save, we have to prepare to send 872 * the RDB from disk to the slave socket. Otherwise if this was 873 * already an RDB -> Slaves socket transfer, used in the case of 874 * diskless replication, our work is trivial, we can just put 875 * the slave online. */ 876 if (type == RDB_CHILD_TYPE_SOCKET) { 877 serverLog(LL_NOTICE, 878 "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 879 replicationGetSlaveName(slave)); 880 /* Note: we wait for a REPLCONF ACK message from slave in 881 * order to really put it online (install the write handler 882 * so that the accumulated data can be transfered). However 883 * we change the replication state ASAP, since our slave 884 * is technically online now. */ 885 slave->replstate = SLAVE_STATE_ONLINE; 886 slave->repl_put_online_on_ack = 1; 887 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ 888 } else { 889 if (bgsaveerr != C_OK) { 890 freeClient(slave); 891 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); 892 continue; 893 } 894 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 895 redis_fstat(slave->repldbfd,&buf) == -1) { 896 freeClient(slave); 897 serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 898 continue; 899 } 900 slave->repldboff = 0; 901 slave->repldbsize = buf.st_size; 902 slave->replstate = SLAVE_STATE_SEND_BULK; 903 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 904 (unsigned long long) slave->repldbsize); 905 906 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 907 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 908 freeClient(slave); 909 continue; 910 } 911 } 912 } 913 } 914 if (startbgsave) startBgsaveForReplication(mincapa); 915 } 916 917 /* ----------------------------------- SLAVE -------------------------------- */ 918 919 /* Returns 1 if the given replication state is a handshake state, 920 * 0 otherwise. */ 921 int slaveIsInHandshakeState(void) { 922 return server.repl_state >= REPL_STATE_RECEIVE_PONG && 923 server.repl_state <= REPL_STATE_RECEIVE_PSYNC; 924 } 925 926 /* Avoid the master to detect the slave is timing out while loading the 927 * RDB file in initial synchronization. We send a single newline character 928 * that is valid protocol but is guaranteed to either be sent entierly or 929 * not, since the byte is indivisible. 930 * 931 * The function is called in two contexts: while we flush the current 932 * data with emptyDb(), and while we load the new data received as an 933 * RDB file from the master. */ 934 void replicationSendNewlineToMaster(void) { 935 static time_t newline_sent; 936 if (time(NULL) != newline_sent) { 937 newline_sent = time(NULL); 938 if (write(server.repl_transfer_s,"\n",1) == -1) { 939 /* Pinging back in this stage is best-effort. */ 940 } 941 } 942 } 943 944 /* Callback used by emptyDb() while flushing away old data to load 945 * the new dataset received by the master. */ 946 void replicationEmptyDbCallback(void *privdata) { 947 UNUSED(privdata); 948 replicationSendNewlineToMaster(); 949 } 950 951 /* Once we have a link with the master and the synchroniziation was 952 * performed, this function materializes the master client we store 953 * at server.master, starting from the specified file descriptor. */ 954 void replicationCreateMasterClient(int fd) { 955 server.master = createClient(fd); 956 server.master->flags |= CLIENT_MASTER; 957 server.master->authenticated = 1; 958 server.repl_state = REPL_STATE_CONNECTED; 959 server.master->reploff = server.repl_master_initial_offset; 960 memcpy(server.master->replrunid, server.repl_master_runid, 961 sizeof(server.repl_master_runid)); 962 /* If master offset is set to -1, this master is old and is not 963 * PSYNC capable, so we flag it accordingly. */ 964 if (server.master->reploff == -1) 965 server.master->flags |= CLIENT_PRE_PSYNC; 966 } 967 968 /* Asynchronously read the SYNC payload we receive from a master */ 969 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 970 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 971 char buf[4096]; 972 ssize_t nread, readlen; 973 off_t left; 974 UNUSED(el); 975 UNUSED(privdata); 976 UNUSED(mask); 977 978 /* Static vars used to hold the EOF mark, and the last bytes received 979 * form the server: when they match, we reached the end of the transfer. */ 980 static char eofmark[CONFIG_RUN_ID_SIZE]; 981 static char lastbytes[CONFIG_RUN_ID_SIZE]; 982 static int usemark = 0; 983 984 /* If repl_transfer_size == -1 we still have to read the bulk length 985 * from the master reply. */ 986 if (server.repl_transfer_size == -1) { 987 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 988 serverLog(LL_WARNING, 989 "I/O error reading bulk count from MASTER: %s", 990 strerror(errno)); 991 goto error; 992 } 993 994 if (buf[0] == '-') { 995 serverLog(LL_WARNING, 996 "MASTER aborted replication with an error: %s", 997 buf+1); 998 goto error; 999 } else if (buf[0] == '\0') { 1000 /* At this stage just a newline works as a PING in order to take 1001 * the connection live. So we refresh our last interaction 1002 * timestamp. */ 1003 server.repl_transfer_lastio = server.unixtime; 1004 return; 1005 } else if (buf[0] != '$') { 1006 serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 1007 goto error; 1008 } 1009 1010 /* There are two possible forms for the bulk payload. One is the 1011 * usual $<count> bulk format. The other is used for diskless transfers 1012 * when the master does not know beforehand the size of the file to 1013 * transfer. In the latter case, the following format is used: 1014 * 1015 * $EOF:<40 bytes delimiter> 1016 * 1017 * At the end of the file the announced delimiter is transmitted. The 1018 * delimiter is long and random enough that the probability of a 1019 * collision with the actual file content can be ignored. */ 1020 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { 1021 usemark = 1; 1022 memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); 1023 memset(lastbytes,0,CONFIG_RUN_ID_SIZE); 1024 /* Set any repl_transfer_size to avoid entering this code path 1025 * at the next call. */ 1026 server.repl_transfer_size = 0; 1027 serverLog(LL_NOTICE, 1028 "MASTER <-> SLAVE sync: receiving streamed RDB from master"); 1029 } else { 1030 usemark = 0; 1031 server.repl_transfer_size = strtol(buf+1,NULL,10); 1032 serverLog(LL_NOTICE, 1033 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 1034 (long long) server.repl_transfer_size); 1035 } 1036 return; 1037 } 1038 1039 /* Read bulk data */ 1040 if (usemark) { 1041 readlen = sizeof(buf); 1042 } else { 1043 left = server.repl_transfer_size - server.repl_transfer_read; 1044 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 1045 } 1046 1047 nread = read(fd,buf,readlen); 1048 if (nread <= 0) { 1049 serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", 1050 (nread == -1) ? strerror(errno) : "connection lost"); 1051 cancelReplicationHandshake(); 1052 return; 1053 } 1054 server.stat_net_input_bytes += nread; 1055 1056 /* When a mark is used, we want to detect EOF asap in order to avoid 1057 * writing the EOF mark into the file... */ 1058 int eof_reached = 0; 1059 1060 if (usemark) { 1061 /* Update the last bytes array, and check if it matches our delimiter.*/ 1062 if (nread >= CONFIG_RUN_ID_SIZE) { 1063 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); 1064 } else { 1065 int rem = CONFIG_RUN_ID_SIZE-nread; 1066 memmove(lastbytes,lastbytes+nread,rem); 1067 memcpy(lastbytes+rem,buf,nread); 1068 } 1069 if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; 1070 } 1071 1072 server.repl_transfer_lastio = server.unixtime; 1073 if (write(server.repl_transfer_fd,buf,nread) != nread) { 1074 serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1075 goto error; 1076 } 1077 server.repl_transfer_read += nread; 1078 1079 /* Delete the last 40 bytes from the file if we reached EOF. */ 1080 if (usemark && eof_reached) { 1081 if (ftruncate(server.repl_transfer_fd, 1082 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) 1083 { 1084 serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 1085 goto error; 1086 } 1087 } 1088 1089 /* Sync data on disk from time to time, otherwise at the end of the transfer 1090 * we may suffer a big delay as the memory buffers are copied into the 1091 * actual disk. */ 1092 if (server.repl_transfer_read >= 1093 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 1094 { 1095 off_t sync_size = server.repl_transfer_read - 1096 server.repl_transfer_last_fsync_off; 1097 rdb_fsync_range(server.repl_transfer_fd, 1098 server.repl_transfer_last_fsync_off, sync_size); 1099 server.repl_transfer_last_fsync_off += sync_size; 1100 } 1101 1102 /* Check if the transfer is now complete */ 1103 if (!usemark) { 1104 if (server.repl_transfer_read == server.repl_transfer_size) 1105 eof_reached = 1; 1106 } 1107 1108 if (eof_reached) { 1109 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 1110 serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1111 cancelReplicationHandshake(); 1112 return; 1113 } 1114 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 1115 signalFlushedDb(-1); 1116 emptyDb(replicationEmptyDbCallback); 1117 /* Before loading the DB into memory we need to delete the readable 1118 * handler, otherwise it will get called recursively since 1119 * rdbLoad() will call the event loop to process events from time to 1120 * time for non blocking loading. */ 1121 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1122 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 1123 if (rdbLoad(server.rdb_filename) != C_OK) { 1124 serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1125 cancelReplicationHandshake(); 1126 return; 1127 } 1128 /* Final setup of the connected slave <- master link */ 1129 zfree(server.repl_transfer_tmpfile); 1130 close(server.repl_transfer_fd); 1131 replicationCreateMasterClient(server.repl_transfer_s); 1132 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 1133 /* Restart the AOF subsystem now that we finished the sync. This 1134 * will trigger an AOF rewrite, and when done will start appending 1135 * to the new file. */ 1136 if (server.aof_state != AOF_OFF) { 1137 int retry = 10; 1138 1139 stopAppendOnly(); 1140 while (retry-- && startAppendOnly() == C_ERR) { 1141 serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 1142 sleep(1); 1143 } 1144 if (!retry) { 1145 serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 1146 exit(1); 1147 } 1148 } 1149 } 1150 1151 return; 1152 1153 error: 1154 cancelReplicationHandshake(); 1155 return; 1156 } 1157 1158 /* Send a synchronous command to the master. Used to send AUTH and 1159 * REPLCONF commands before starting the replication with SYNC. 1160 * 1161 * The command returns an sds string representing the result of the 1162 * operation. On error the first byte is a "-". 1163 */ 1164 #define SYNC_CMD_READ (1<<0) 1165 #define SYNC_CMD_WRITE (1<<1) 1166 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE) 1167 char *sendSynchronousCommand(int flags, int fd, ...) { 1168 1169 /* Create the command to send to the master, we use simple inline 1170 * protocol for simplicity as currently we only send simple strings. */ 1171 if (flags & SYNC_CMD_WRITE) { 1172 char *arg; 1173 va_list ap; 1174 sds cmd = sdsempty(); 1175 va_start(ap,fd); 1176 1177 while(1) { 1178 arg = va_arg(ap, char*); 1179 if (arg == NULL) break; 1180 1181 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 1182 cmd = sdscat(cmd,arg); 1183 } 1184 cmd = sdscatlen(cmd,"\r\n",2); 1185 1186 /* Transfer command to the server. */ 1187 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) 1188 == -1) 1189 { 1190 sdsfree(cmd); 1191 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1192 strerror(errno)); 1193 } 1194 sdsfree(cmd); 1195 va_end(ap); 1196 } 1197 1198 /* Read the reply from the server. */ 1199 if (flags & SYNC_CMD_READ) { 1200 char buf[256]; 1201 1202 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) 1203 == -1) 1204 { 1205 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1206 strerror(errno)); 1207 } 1208 server.repl_transfer_lastio = server.unixtime; 1209 return sdsnew(buf); 1210 } 1211 return NULL; 1212 } 1213 1214 /* Try a partial resynchronization with the master if we are about to reconnect. 1215 * If there is no cached master structure, at least try to issue a 1216 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1217 * command in order to obtain the master run id and the master replication 1218 * global offset. 1219 * 1220 * This function is designed to be called from syncWithMaster(), so the 1221 * following assumptions are made: 1222 * 1223 * 1) We pass the function an already connected socket "fd". 1224 * 2) This function does not close the file descriptor "fd". However in case 1225 * of successful partial resynchronization, the function will reuse 1226 * 'fd' as file descriptor of the server.master client structure. 1227 * 1228 * The function is split in two halves: if read_reply is 0, the function 1229 * writes the PSYNC command on the socket, and a new function call is 1230 * needed, with read_reply set to 1, in order to read the reply of the 1231 * command. This is useful in order to support non blocking operations, so 1232 * that we write, return into the event loop, and read when there are data. 1233 * 1234 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there 1235 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call 1236 * with read_reply set to 1. However even when read_reply is set to 1 1237 * the function may return PSYNC_WAIT_REPLY again to signal there were 1238 * insufficient data to read to complete its work. We should re-enter 1239 * into the event loop and wait in such a case. 1240 * 1241 * The function returns: 1242 * 1243 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 1244 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1245 * In this case the master run_id and global replication 1246 * offset is saved. 1247 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1248 * the caller should fall back to SYNC. 1249 * PSYNC_WRITE_ERR: There was an error writing the command to the socket. 1250 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. 1251 * 1252 * Notable side effects: 1253 * 1254 * 1) As a side effect of the function call the function removes the readable 1255 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. 1256 * 2) server.repl_master_initial_offset is set to the right value according 1257 * to the master reply. This will be used to populate the 'server.master' 1258 * structure replication offset. 1259 */ 1260 1261 #define PSYNC_WRITE_ERROR 0 1262 #define PSYNC_WAIT_REPLY 1 1263 #define PSYNC_CONTINUE 2 1264 #define PSYNC_FULLRESYNC 3 1265 #define PSYNC_NOT_SUPPORTED 4 1266 int slaveTryPartialResynchronization(int fd, int read_reply) { 1267 char *psync_runid; 1268 char psync_offset[32]; 1269 sds reply; 1270 1271 /* Writing half */ 1272 if (!read_reply) { 1273 /* Initially set repl_master_initial_offset to -1 to mark the current 1274 * master run_id and offset as not valid. Later if we'll be able to do 1275 * a FULL resync using the PSYNC command we'll set the offset at the 1276 * right value, so that this information will be propagated to the 1277 * client structure representing the master into server.master. */ 1278 server.repl_master_initial_offset = -1; 1279 1280 if (server.cached_master) { 1281 psync_runid = server.cached_master->replrunid; 1282 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1283 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 1284 } else { 1285 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); 1286 psync_runid = "?"; 1287 memcpy(psync_offset,"-1",3); 1288 } 1289 1290 /* Issue the PSYNC command */ 1291 reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); 1292 if (reply != NULL) { 1293 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); 1294 sdsfree(reply); 1295 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1296 return PSYNC_WRITE_ERROR; 1297 } 1298 return PSYNC_WAIT_REPLY; 1299 } 1300 1301 /* Reading half */ 1302 reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1303 if (sdslen(reply) == 0) { 1304 /* The master may send empty newlines after it receives PSYNC 1305 * and before to reply, just to keep the connection alive. */ 1306 sdsfree(reply); 1307 return PSYNC_WAIT_REPLY; 1308 } 1309 1310 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1311 1312 if (!strncmp(reply,"+FULLRESYNC",11)) { 1313 char *runid = NULL, *offset = NULL; 1314 1315 /* FULL RESYNC, parse the reply in order to extract the run id 1316 * and the replication offset. */ 1317 runid = strchr(reply,' '); 1318 if (runid) { 1319 runid++; 1320 offset = strchr(runid,' '); 1321 if (offset) offset++; 1322 } 1323 if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { 1324 serverLog(LL_WARNING, 1325 "Master replied with wrong +FULLRESYNC syntax."); 1326 /* This is an unexpected condition, actually the +FULLRESYNC 1327 * reply means that the master supports PSYNC, but the reply 1328 * format seems wrong. To stay safe we blank the master 1329 * runid to make sure next PSYNCs will fail. */ 1330 memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); 1331 } else { 1332 memcpy(server.repl_master_runid, runid, offset-runid-1); 1333 server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; 1334 server.repl_master_initial_offset = strtoll(offset,NULL,10); 1335 serverLog(LL_NOTICE,"Full resync from master: %s:%lld", 1336 server.repl_master_runid, 1337 server.repl_master_initial_offset); 1338 } 1339 /* We are going to full resync, discard the cached master structure. */ 1340 replicationDiscardCachedMaster(); 1341 sdsfree(reply); 1342 return PSYNC_FULLRESYNC; 1343 } 1344 1345 if (!strncmp(reply,"+CONTINUE",9)) { 1346 /* Partial resync was accepted, set the replication state accordingly */ 1347 serverLog(LL_NOTICE, 1348 "Successful partial resynchronization with master."); 1349 sdsfree(reply); 1350 replicationResurrectCachedMaster(fd); 1351 return PSYNC_CONTINUE; 1352 } 1353 1354 /* If we reach this point we received either an error since the master does 1355 * not understand PSYNC, or an unexpected reply from the master. 1356 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1357 1358 if (strncmp(reply,"-ERR",4)) { 1359 /* If it's not an error, log the unexpected event. */ 1360 serverLog(LL_WARNING, 1361 "Unexpected reply to PSYNC from master: %s", reply); 1362 } else { 1363 serverLog(LL_NOTICE, 1364 "Master does not support PSYNC or is in " 1365 "error state (reply: %s)", reply); 1366 } 1367 sdsfree(reply); 1368 replicationDiscardCachedMaster(); 1369 return PSYNC_NOT_SUPPORTED; 1370 } 1371 1372 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1373 char tmpfile[256], *err = NULL; 1374 int dfd, maxtries = 5; 1375 int sockerr = 0, psync_result; 1376 socklen_t errlen = sizeof(sockerr); 1377 UNUSED(el); 1378 UNUSED(privdata); 1379 UNUSED(mask); 1380 1381 /* If this event fired after the user turned the instance into a master 1382 * with SLAVEOF NO ONE we must just return ASAP. */ 1383 if (server.repl_state == REPL_STATE_NONE) { 1384 close(fd); 1385 return; 1386 } 1387 1388 /* Check for errors in the socket. */ 1389 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1390 sockerr = errno; 1391 if (sockerr) { 1392 serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", 1393 strerror(sockerr)); 1394 goto error; 1395 } 1396 1397 /* Send a PING to check the master is able to reply without errors. */ 1398 if (server.repl_state == REPL_STATE_CONNECTING) { 1399 serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); 1400 /* Delete the writable event so that the readable event remains 1401 * registered and we can wait for the PONG reply. */ 1402 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1403 server.repl_state = REPL_STATE_RECEIVE_PONG; 1404 /* Send the PING, don't check for errors at all, we have the timeout 1405 * that will take care about this. */ 1406 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); 1407 if (err) goto write_error; 1408 return; 1409 } 1410 1411 /* Receive the PONG command. */ 1412 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { 1413 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1414 1415 /* We accept only two replies as valid, a positive +PONG reply 1416 * (we just check for "+") or an authentication error. 1417 * Note that older versions of Redis replied with "operation not 1418 * permitted" instead of using a proper error code, so we test 1419 * both. */ 1420 if (err[0] != '+' && 1421 strncmp(err,"-NOAUTH",7) != 0 && 1422 strncmp(err,"-ERR operation not permitted",28) != 0) 1423 { 1424 serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); 1425 sdsfree(err); 1426 goto error; 1427 } else { 1428 serverLog(LL_NOTICE, 1429 "Master replied to PING, replication can continue..."); 1430 } 1431 sdsfree(err); 1432 server.repl_state = REPL_STATE_SEND_AUTH; 1433 } 1434 1435 /* AUTH with the master if required. */ 1436 if (server.repl_state == REPL_STATE_SEND_AUTH) { 1437 if (server.masterauth) { 1438 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); 1439 if (err) goto write_error; 1440 server.repl_state = REPL_STATE_RECEIVE_AUTH; 1441 return; 1442 } else { 1443 server.repl_state = REPL_STATE_SEND_PORT; 1444 } 1445 } 1446 1447 /* Receive AUTH reply. */ 1448 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { 1449 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1450 if (err[0] == '-') { 1451 serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); 1452 sdsfree(err); 1453 goto error; 1454 } 1455 sdsfree(err); 1456 server.repl_state = REPL_STATE_SEND_PORT; 1457 } 1458 1459 /* Set the slave port, so that Master's INFO command can list the 1460 * slave listening port correctly. */ 1461 if (server.repl_state == REPL_STATE_SEND_PORT) { 1462 sds port = sdsfromlonglong(server.port); 1463 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1464 "listening-port",port, NULL); 1465 sdsfree(port); 1466 if (err) goto write_error; 1467 sdsfree(err); 1468 server.repl_state = REPL_STATE_RECEIVE_PORT; 1469 return; 1470 } 1471 1472 /* Receive REPLCONF listening-port reply. */ 1473 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { 1474 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1475 /* Ignore the error if any, not all the Redis versions support 1476 * REPLCONF listening-port. */ 1477 if (err[0] == '-') { 1478 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1479 "REPLCONF listening-port: %s", err); 1480 } 1481 sdsfree(err); 1482 server.repl_state = REPL_STATE_SEND_CAPA; 1483 } 1484 1485 /* Inform the master of our capabilities. While we currently send 1486 * just one capability, it is possible to chain new capabilities here 1487 * in the form of REPLCONF capa X capa Y capa Z ... 1488 * The master will ignore capabilities it does not understand. */ 1489 if (server.repl_state == REPL_STATE_SEND_CAPA) { 1490 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1491 "capa","eof",NULL); 1492 if (err) goto write_error; 1493 sdsfree(err); 1494 server.repl_state = REPL_STATE_RECEIVE_CAPA; 1495 return; 1496 } 1497 1498 /* Receive CAPA reply. */ 1499 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { 1500 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1501 /* Ignore the error if any, not all the Redis versions support 1502 * REPLCONF capa. */ 1503 if (err[0] == '-') { 1504 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1505 "REPLCONF capa: %s", err); 1506 } 1507 sdsfree(err); 1508 server.repl_state = REPL_STATE_SEND_PSYNC; 1509 } 1510 1511 /* Try a partial resynchonization. If we don't have a cached master 1512 * slaveTryPartialResynchronization() will at least try to use PSYNC 1513 * to start a full resynchronization so that we get the master run id 1514 * and the global offset, to try a partial resync at the next 1515 * reconnection attempt. */ 1516 if (server.repl_state == REPL_STATE_SEND_PSYNC) { 1517 if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { 1518 err = sdsnew("Write error sending the PSYNC command."); 1519 goto write_error; 1520 } 1521 server.repl_state = REPL_STATE_RECEIVE_PSYNC; 1522 return; 1523 } 1524 1525 /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ 1526 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { 1527 serverLog(LL_WARNING,"syncWithMaster(): state machine error, " 1528 "state should be RECEIVE_PSYNC but is %d", 1529 server.repl_state); 1530 goto error; 1531 } 1532 1533 psync_result = slaveTryPartialResynchronization(fd,1); 1534 if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ 1535 1536 /* Note: if PSYNC does not return WAIT_REPLY, it will take care of 1537 * uninstalling the read handler from the file descriptor. */ 1538 1539 if (psync_result == PSYNC_CONTINUE) { 1540 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1541 return; 1542 } 1543 1544 /* PSYNC failed or is not supported: we want our slaves to resync with us 1545 * as well, if we have any (chained replication case). The mater may 1546 * transfer us an entirely different data set and we have no way to 1547 * incrementally feed our slaves after that. */ 1548 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1549 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1550 1551 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1552 * and the server.repl_master_runid and repl_master_initial_offset are 1553 * already populated. */ 1554 if (psync_result == PSYNC_NOT_SUPPORTED) { 1555 serverLog(LL_NOTICE,"Retrying with SYNC..."); 1556 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1557 serverLog(LL_WARNING,"I/O error writing to MASTER: %s", 1558 strerror(errno)); 1559 goto error; 1560 } 1561 } 1562 1563 /* Prepare a suitable temp file for bulk transfer */ 1564 while(maxtries--) { 1565 snprintf(tmpfile,256, 1566 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1567 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1568 if (dfd != -1) break; 1569 sleep(1); 1570 } 1571 if (dfd == -1) { 1572 serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1573 goto error; 1574 } 1575 1576 /* Setup the non blocking download of the bulk file. */ 1577 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1578 == AE_ERR) 1579 { 1580 serverLog(LL_WARNING, 1581 "Can't create readable event for SYNC: %s (fd=%d)", 1582 strerror(errno),fd); 1583 goto error; 1584 } 1585 1586 server.repl_state = REPL_STATE_TRANSFER; 1587 server.repl_transfer_size = -1; 1588 server.repl_transfer_read = 0; 1589 server.repl_transfer_last_fsync_off = 0; 1590 server.repl_transfer_fd = dfd; 1591 server.repl_transfer_lastio = server.unixtime; 1592 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1593 return; 1594 1595 error: 1596 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1597 close(fd); 1598 server.repl_transfer_s = -1; 1599 server.repl_state = REPL_STATE_CONNECT; 1600 return; 1601 1602 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ 1603 serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); 1604 sdsfree(err); 1605 goto error; 1606 } 1607 1608 int connectWithMaster(void) { 1609 int fd; 1610 1611 fd = anetTcpNonBlockBestEffortBindConnect(NULL, 1612 server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); 1613 if (fd == -1) { 1614 serverLog(LL_WARNING,"Unable to connect to MASTER: %s", 1615 strerror(errno)); 1616 return C_ERR; 1617 } 1618 1619 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1620 AE_ERR) 1621 { 1622 close(fd); 1623 serverLog(LL_WARNING,"Can't create readable event for SYNC"); 1624 return C_ERR; 1625 } 1626 1627 server.repl_transfer_lastio = server.unixtime; 1628 server.repl_transfer_s = fd; 1629 server.repl_state = REPL_STATE_CONNECTING; 1630 return C_OK; 1631 } 1632 1633 /* This function can be called when a non blocking connection is currently 1634 * in progress to undo it. 1635 * Never call this function directly, use cancelReplicationHandshake() instead. 1636 */ 1637 void undoConnectWithMaster(void) { 1638 int fd = server.repl_transfer_s; 1639 1640 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1641 close(fd); 1642 server.repl_transfer_s = -1; 1643 } 1644 1645 /* Abort the async download of the bulk dataset while SYNC-ing with master. 1646 * Never call this function directly, use cancelReplicationHandshake() instead. 1647 */ 1648 void replicationAbortSyncTransfer(void) { 1649 serverAssert(server.repl_state == REPL_STATE_TRANSFER); 1650 undoConnectWithMaster(); 1651 close(server.repl_transfer_fd); 1652 unlink(server.repl_transfer_tmpfile); 1653 zfree(server.repl_transfer_tmpfile); 1654 } 1655 1656 /* This function aborts a non blocking replication attempt if there is one 1657 * in progress, by canceling the non-blocking connect attempt or 1658 * the initial bulk transfer. 1659 * 1660 * If there was a replication handshake in progress 1 is returned and 1661 * the replication state (server.repl_state) set to REPL_STATE_CONNECT. 1662 * 1663 * Otherwise zero is returned and no operation is perforemd at all. */ 1664 int cancelReplicationHandshake(void) { 1665 if (server.repl_state == REPL_STATE_TRANSFER) { 1666 replicationAbortSyncTransfer(); 1667 server.repl_state = REPL_STATE_CONNECT; 1668 } else if (server.repl_state == REPL_STATE_CONNECTING || 1669 slaveIsInHandshakeState()) 1670 { 1671 undoConnectWithMaster(); 1672 server.repl_state = REPL_STATE_CONNECT; 1673 } else { 1674 return 0; 1675 } 1676 return 1; 1677 } 1678 1679 /* Set replication to the specified master address and port. */ 1680 void replicationSetMaster(char *ip, int port) { 1681 sdsfree(server.masterhost); 1682 server.masterhost = sdsnew(ip); 1683 server.masterport = port; 1684 if (server.master) freeClient(server.master); 1685 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ 1686 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1687 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1688 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1689 cancelReplicationHandshake(); 1690 server.repl_state = REPL_STATE_CONNECT; 1691 server.master_repl_offset = 0; 1692 server.repl_down_since = 0; 1693 } 1694 1695 /* Cancel replication, setting the instance as a master itself. */ 1696 void replicationUnsetMaster(void) { 1697 if (server.masterhost == NULL) return; /* Nothing to do. */ 1698 sdsfree(server.masterhost); 1699 server.masterhost = NULL; 1700 if (server.master) { 1701 if (listLength(server.slaves) == 0) { 1702 /* If this instance is turned into a master and there are no 1703 * slaves, it inherits the replication offset from the master. 1704 * Under certain conditions this makes replicas comparable by 1705 * replication offset to understand what is the most updated. */ 1706 server.master_repl_offset = server.master->reploff; 1707 freeReplicationBacklog(); 1708 } 1709 freeClient(server.master); 1710 } 1711 replicationDiscardCachedMaster(); 1712 cancelReplicationHandshake(); 1713 server.repl_state = REPL_STATE_NONE; 1714 } 1715 1716 /* This function is called when the slave lose the connection with the 1717 * master into an unexpected way. */ 1718 void replicationHandleMasterDisconnection(void) { 1719 server.master = NULL; 1720 server.repl_state = REPL_STATE_CONNECT; 1721 server.repl_down_since = server.unixtime; 1722 /* We lost connection with our master, don't disconnect slaves yet, 1723 * maybe we'll be able to PSYNC with our master later. We'll disconnect 1724 * the slaves only if we'll have to do a full resync with our master. */ 1725 } 1726 1727 void slaveofCommand(client *c) { 1728 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1729 * configured using the current address of the master node. */ 1730 if (server.cluster_enabled) { 1731 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1732 return; 1733 } 1734 1735 /* The special host/port combination "NO" "ONE" turns the instance 1736 * into a master. Otherwise the new master address is set. */ 1737 if (!strcasecmp(c->argv[1]->ptr,"no") && 1738 !strcasecmp(c->argv[2]->ptr,"one")) { 1739 if (server.masterhost) { 1740 replicationUnsetMaster(); 1741 sds client = catClientInfoString(sdsempty(),c); 1742 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", 1743 client); 1744 sdsfree(client); 1745 } 1746 } else { 1747 long port; 1748 1749 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) 1750 return; 1751 1752 /* Check if we are already attached to the specified slave */ 1753 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1754 && server.masterport == port) { 1755 serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1756 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1757 return; 1758 } 1759 /* There was no previous master or the user specified a different one, 1760 * we can continue. */ 1761 replicationSetMaster(c->argv[1]->ptr, port); 1762 sds client = catClientInfoString(sdsempty(),c); 1763 serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')", 1764 server.masterhost, server.masterport, client); 1765 sdsfree(client); 1766 } 1767 addReply(c,shared.ok); 1768 } 1769 1770 /* ROLE command: provide information about the role of the instance 1771 * (master or slave) and additional information related to replication 1772 * in an easy to process format. */ 1773 void roleCommand(client *c) { 1774 if (server.masterhost == NULL) { 1775 listIter li; 1776 listNode *ln; 1777 void *mbcount; 1778 int slaves = 0; 1779 1780 addReplyMultiBulkLen(c,3); 1781 addReplyBulkCBuffer(c,"master",6); 1782 addReplyLongLong(c,server.master_repl_offset); 1783 mbcount = addDeferredMultiBulkLength(c); 1784 listRewind(server.slaves,&li); 1785 while((ln = listNext(&li))) { 1786 client *slave = ln->value; 1787 char ip[NET_IP_STR_LEN]; 1788 1789 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1790 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 1791 addReplyMultiBulkLen(c,3); 1792 addReplyBulkCString(c,ip); 1793 addReplyBulkLongLong(c,slave->slave_listening_port); 1794 addReplyBulkLongLong(c,slave->repl_ack_off); 1795 slaves++; 1796 } 1797 setDeferredMultiBulkLength(c,mbcount,slaves); 1798 } else { 1799 char *slavestate = NULL; 1800 1801 addReplyMultiBulkLen(c,5); 1802 addReplyBulkCBuffer(c,"slave",5); 1803 addReplyBulkCString(c,server.masterhost); 1804 addReplyLongLong(c,server.masterport); 1805 if (slaveIsInHandshakeState()) { 1806 slavestate = "handshake"; 1807 } else { 1808 switch(server.repl_state) { 1809 case REPL_STATE_NONE: slavestate = "none"; break; 1810 case REPL_STATE_CONNECT: slavestate = "connect"; break; 1811 case REPL_STATE_CONNECTING: slavestate = "connecting"; break; 1812 case REPL_STATE_TRANSFER: slavestate = "sync"; break; 1813 case REPL_STATE_CONNECTED: slavestate = "connected"; break; 1814 default: slavestate = "unknown"; break; 1815 } 1816 } 1817 addReplyBulkCString(c,slavestate); 1818 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1819 } 1820 } 1821 1822 /* Send a REPLCONF ACK command to the master to inform it about the current 1823 * processed offset. If we are not connected with a master, the command has 1824 * no effects. */ 1825 void replicationSendAck(void) { 1826 client *c = server.master; 1827 1828 if (c != NULL) { 1829 c->flags |= CLIENT_MASTER_FORCE_REPLY; 1830 addReplyMultiBulkLen(c,3); 1831 addReplyBulkCString(c,"REPLCONF"); 1832 addReplyBulkCString(c,"ACK"); 1833 addReplyBulkLongLong(c,c->reploff); 1834 c->flags &= ~CLIENT_MASTER_FORCE_REPLY; 1835 } 1836 } 1837 1838 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1839 1840 /* In order to implement partial synchronization we need to be able to cache 1841 * our master's client structure after a transient disconnection. 1842 * It is cached into server.cached_master and flushed away using the following 1843 * functions. */ 1844 1845 /* This function is called by freeClient() in order to cache the master 1846 * client structure instead of destryoing it. freeClient() will return 1847 * ASAP after this function returns, so every action needed to avoid problems 1848 * with a client that is really "suspended" has to be done by this function. 1849 * 1850 * The other functions that will deal with the cached master are: 1851 * 1852 * replicationDiscardCachedMaster() that will make sure to kill the client 1853 * as for some reason we don't want to use it in the future. 1854 * 1855 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1856 * handshake in order to reactivate the cached master. 1857 */ 1858 void replicationCacheMaster(client *c) { 1859 serverAssert(server.master != NULL && server.cached_master == NULL); 1860 serverLog(LL_NOTICE,"Caching the disconnected master state."); 1861 1862 /* Unlink the client from the server structures. */ 1863 unlinkClient(c); 1864 1865 /* Save the master. Server.master will be set to null later by 1866 * replicationHandleMasterDisconnection(). */ 1867 server.cached_master = server.master; 1868 1869 /* Invalidate the Peer ID cache. */ 1870 if (c->peerid) { 1871 sdsfree(c->peerid); 1872 c->peerid = NULL; 1873 } 1874 1875 /* Caching the master happens instead of the actual freeClient() call, 1876 * so make sure to adjust the replication state. This function will 1877 * also set server.master to NULL. */ 1878 replicationHandleMasterDisconnection(); 1879 } 1880 1881 /* Free a cached master, called when there are no longer the conditions for 1882 * a partial resync on reconnection. */ 1883 void replicationDiscardCachedMaster(void) { 1884 if (server.cached_master == NULL) return; 1885 1886 serverLog(LL_NOTICE,"Discarding previously cached master state."); 1887 server.cached_master->flags &= ~CLIENT_MASTER; 1888 freeClient(server.cached_master); 1889 server.cached_master = NULL; 1890 } 1891 1892 /* Turn the cached master into the current master, using the file descriptor 1893 * passed as argument as the socket for the new master. 1894 * 1895 * This function is called when successfully setup a partial resynchronization 1896 * so the stream of data that we'll receive will start from were this 1897 * master left. */ 1898 void replicationResurrectCachedMaster(int newfd) { 1899 server.master = server.cached_master; 1900 server.cached_master = NULL; 1901 server.master->fd = newfd; 1902 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); 1903 server.master->authenticated = 1; 1904 server.master->lastinteraction = server.unixtime; 1905 server.repl_state = REPL_STATE_CONNECTED; 1906 1907 /* Re-add to the list of clients. */ 1908 listAddNodeTail(server.clients,server.master); 1909 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1910 readQueryFromClient, server.master)) { 1911 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1912 freeClientAsync(server.master); /* Close ASAP. */ 1913 } 1914 1915 /* We may also need to install the write handler as well if there is 1916 * pending data in the write buffers. */ 1917 if (clientHasPendingReplies(server.master)) { 1918 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1919 sendReplyToClient, server.master)) { 1920 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1921 freeClientAsync(server.master); /* Close ASAP. */ 1922 } 1923 } 1924 } 1925 1926 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1927 1928 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1929 * If the option is active, the server will prevent writes if there are not 1930 * enough connected slaves with the specified lag (or less). */ 1931 void refreshGoodSlavesCount(void) { 1932 listIter li; 1933 listNode *ln; 1934 int good = 0; 1935 1936 if (!server.repl_min_slaves_to_write || 1937 !server.repl_min_slaves_max_lag) return; 1938 1939 listRewind(server.slaves,&li); 1940 while((ln = listNext(&li))) { 1941 client *slave = ln->value; 1942 time_t lag = server.unixtime - slave->repl_ack_time; 1943 1944 if (slave->replstate == SLAVE_STATE_ONLINE && 1945 lag <= server.repl_min_slaves_max_lag) good++; 1946 } 1947 server.repl_good_slaves_count = good; 1948 } 1949 1950 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1951 * The goal of this code is to keep track of scripts already sent to every 1952 * connected slave, in order to be able to replicate EVALSHA as it is without 1953 * translating it to EVAL every time it is possible. 1954 * 1955 * We use a capped collection implemented by a hash table for fast lookup 1956 * of scripts we can send as EVALSHA, plus a linked list that is used for 1957 * eviction of the oldest entry when the max number of items is reached. 1958 * 1959 * We don't care about taking a different cache for every different slave 1960 * since to fill the cache again is not very costly, the goal of this code 1961 * is to avoid that the same big script is trasmitted a big number of times 1962 * per second wasting bandwidth and processor speed, but it is not a problem 1963 * if we need to rebuild the cache from scratch from time to time, every used 1964 * script will need to be transmitted a single time to reappear in the cache. 1965 * 1966 * This is how the system works: 1967 * 1968 * 1) Every time a new slave connects, we flush the whole script cache. 1969 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1970 * trying to convert EVAL into EVALSHA specifically for slaves. 1971 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1972 * corresponding SHA1 of the script into the cache as we are sure every 1973 * slave knows about the script starting from now. 1974 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1975 * and at the same time flush the script cache. 1976 * 5) When the last slave disconnects, flush the cache. 1977 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1978 * in the master sometimes. 1979 */ 1980 1981 /* Initialize the script cache, only called at startup. */ 1982 void replicationScriptCacheInit(void) { 1983 server.repl_scriptcache_size = 10000; 1984 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1985 server.repl_scriptcache_fifo = listCreate(); 1986 } 1987 1988 /* Empty the script cache. Should be called every time we are no longer sure 1989 * that every slave knows about all the scripts in our set, or when the 1990 * current AOF "context" is no longer aware of the script. In general we 1991 * should flush the cache: 1992 * 1993 * 1) Every time a new slave reconnects to this master and performs a 1994 * full SYNC (PSYNC does not require flushing). 1995 * 2) Every time an AOF rewrite is performed. 1996 * 3) Every time we are left without slaves at all, and AOF is off, in order 1997 * to reclaim otherwise unused memory. 1998 */ 1999 void replicationScriptCacheFlush(void) { 2000 dictEmpty(server.repl_scriptcache_dict,NULL); 2001 listRelease(server.repl_scriptcache_fifo); 2002 server.repl_scriptcache_fifo = listCreate(); 2003 } 2004 2005 /* Add an entry into the script cache, if we reach max number of entries the 2006 * oldest is removed from the list. */ 2007 void replicationScriptCacheAdd(sds sha1) { 2008 int retval; 2009 sds key = sdsdup(sha1); 2010 2011 /* Evict oldest. */ 2012 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 2013 { 2014 listNode *ln = listLast(server.repl_scriptcache_fifo); 2015 sds oldest = listNodeValue(ln); 2016 2017 retval = dictDelete(server.repl_scriptcache_dict,oldest); 2018 serverAssert(retval == DICT_OK); 2019 listDelNode(server.repl_scriptcache_fifo,ln); 2020 } 2021 2022 /* Add current. */ 2023 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 2024 listAddNodeHead(server.repl_scriptcache_fifo,key); 2025 serverAssert(retval == DICT_OK); 2026 } 2027 2028 /* Returns non-zero if the specified entry exists inside the cache, that is, 2029 * if all the slaves are aware of this script SHA1. */ 2030 int replicationScriptCacheExists(sds sha1) { 2031 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 2032 } 2033 2034 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 2035 * Redis synchronous replication design can be summarized in points: 2036 * 2037 * - Redis masters have a global replication offset, used by PSYNC. 2038 * - Master increment the offset every time new commands are sent to slaves. 2039 * - Slaves ping back masters with the offset processed so far. 2040 * 2041 * So synchronous replication adds a new WAIT command in the form: 2042 * 2043 * WAIT <num_replicas> <milliseconds_timeout> 2044 * 2045 * That returns the number of replicas that processed the query when 2046 * we finally have at least num_replicas, or when the timeout was 2047 * reached. 2048 * 2049 * The command is implemented in this way: 2050 * 2051 * - Every time a client processes a command, we remember the replication 2052 * offset after sending that command to the slaves. 2053 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 2054 * The client is blocked at the same time (see blocked.c). 2055 * - Once we receive enough ACKs for a given offset or when the timeout 2056 * is reached, the WAIT command is unblocked and the reply sent to the 2057 * client. 2058 */ 2059 2060 /* This just set a flag so that we broadcast a REPLCONF GETACK command 2061 * to all the slaves in the beforeSleep() function. Note that this way 2062 * we "group" all the clients that want to wait for synchronouns replication 2063 * in a given event loop iteration, and send a single GETACK for them all. */ 2064 void replicationRequestAckFromSlaves(void) { 2065 server.get_ack_from_slaves = 1; 2066 } 2067 2068 /* Return the number of slaves that already acknowledged the specified 2069 * replication offset. */ 2070 int replicationCountAcksByOffset(long long offset) { 2071 listIter li; 2072 listNode *ln; 2073 int count = 0; 2074 2075 listRewind(server.slaves,&li); 2076 while((ln = listNext(&li))) { 2077 client *slave = ln->value; 2078 2079 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2080 if (slave->repl_ack_off >= offset) count++; 2081 } 2082 return count; 2083 } 2084 2085 /* WAIT for N replicas to acknowledge the processing of our latest 2086 * write command (and all the previous commands). */ 2087 void waitCommand(client *c) { 2088 mstime_t timeout; 2089 long numreplicas, ackreplicas; 2090 long long offset = c->woff; 2091 2092 /* Argument parsing. */ 2093 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) 2094 return; 2095 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 2096 != C_OK) return; 2097 2098 /* First try without blocking at all. */ 2099 ackreplicas = replicationCountAcksByOffset(c->woff); 2100 if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) { 2101 addReplyLongLong(c,ackreplicas); 2102 return; 2103 } 2104 2105 /* Otherwise block the client and put it into our list of clients 2106 * waiting for ack from slaves. */ 2107 c->bpop.timeout = timeout; 2108 c->bpop.reploffset = offset; 2109 c->bpop.numreplicas = numreplicas; 2110 listAddNodeTail(server.clients_waiting_acks,c); 2111 blockClient(c,BLOCKED_WAIT); 2112 2113 /* Make sure that the server will send an ACK request to all the slaves 2114 * before returning to the event loop. */ 2115 replicationRequestAckFromSlaves(); 2116 } 2117 2118 /* This is called by unblockClient() to perform the blocking op type 2119 * specific cleanup. We just remove the client from the list of clients 2120 * waiting for replica acks. Never call it directly, call unblockClient() 2121 * instead. */ 2122 void unblockClientWaitingReplicas(client *c) { 2123 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 2124 serverAssert(ln != NULL); 2125 listDelNode(server.clients_waiting_acks,ln); 2126 } 2127 2128 /* Check if there are clients blocked in WAIT that can be unblocked since 2129 * we received enough ACKs from slaves. */ 2130 void processClientsWaitingReplicas(void) { 2131 long long last_offset = 0; 2132 int last_numreplicas = 0; 2133 2134 listIter li; 2135 listNode *ln; 2136 2137 listRewind(server.clients_waiting_acks,&li); 2138 while((ln = listNext(&li))) { 2139 client *c = ln->value; 2140 2141 /* Every time we find a client that is satisfied for a given 2142 * offset and number of replicas, we remember it so the next client 2143 * may be unblocked without calling replicationCountAcksByOffset() 2144 * if the requested offset / replicas were equal or less. */ 2145 if (last_offset && last_offset > c->bpop.reploffset && 2146 last_numreplicas > c->bpop.numreplicas) 2147 { 2148 unblockClient(c); 2149 addReplyLongLong(c,last_numreplicas); 2150 } else { 2151 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 2152 2153 if (numreplicas >= c->bpop.numreplicas) { 2154 last_offset = c->bpop.reploffset; 2155 last_numreplicas = numreplicas; 2156 unblockClient(c); 2157 addReplyLongLong(c,numreplicas); 2158 } 2159 } 2160 } 2161 } 2162 2163 /* Return the slave replication offset for this instance, that is 2164 * the offset for which we already processed the master replication stream. */ 2165 long long replicationGetSlaveOffset(void) { 2166 long long offset = 0; 2167 2168 if (server.masterhost != NULL) { 2169 if (server.master) { 2170 offset = server.master->reploff; 2171 } else if (server.cached_master) { 2172 offset = server.cached_master->reploff; 2173 } 2174 } 2175 /* offset may be -1 when the master does not support it at all, however 2176 * this function is designed to return an offset that can express the 2177 * amount of data processed by the master, so we return a positive 2178 * integer. */ 2179 if (offset < 0) offset = 0; 2180 return offset; 2181 } 2182 2183 /* --------------------------- REPLICATION CRON ---------------------------- */ 2184 2185 /* Replication cron function, called 1 time per second. */ 2186 void replicationCron(void) { 2187 static long long replication_cron_loops = 0; 2188 2189 /* Non blocking connection timeout? */ 2190 if (server.masterhost && 2191 (server.repl_state == REPL_STATE_CONNECTING || 2192 slaveIsInHandshakeState()) && 2193 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2194 { 2195 serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); 2196 cancelReplicationHandshake(); 2197 } 2198 2199 /* Bulk transfer I/O timeout? */ 2200 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && 2201 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2202 { 2203 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 2204 cancelReplicationHandshake(); 2205 } 2206 2207 /* Timed out master when we are an already connected slave? */ 2208 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && 2209 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 2210 { 2211 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); 2212 freeClient(server.master); 2213 } 2214 2215 /* Check if we should connect to a MASTER */ 2216 if (server.repl_state == REPL_STATE_CONNECT) { 2217 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", 2218 server.masterhost, server.masterport); 2219 if (connectWithMaster() == C_OK) { 2220 serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); 2221 } 2222 } 2223 2224 /* Send ACK to master from time to time. 2225 * Note that we do not send periodic acks to masters that don't 2226 * support PSYNC and replication offsets. */ 2227 if (server.masterhost && server.master && 2228 !(server.master->flags & CLIENT_PRE_PSYNC)) 2229 replicationSendAck(); 2230 2231 /* If we have attached slaves, PING them from time to time. 2232 * So slaves can implement an explicit timeout to masters, and will 2233 * be able to detect a link disconnection even if the TCP connection 2234 * will not actually go down. */ 2235 listIter li; 2236 listNode *ln; 2237 robj *ping_argv[1]; 2238 2239 /* First, send PING according to ping_slave_period. */ 2240 if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { 2241 ping_argv[0] = createStringObject("PING",4); 2242 replicationFeedSlaves(server.slaves, server.slaveseldb, 2243 ping_argv, 1); 2244 decrRefCount(ping_argv[0]); 2245 } 2246 2247 /* Second, send a newline to all the slaves in pre-synchronization 2248 * stage, that is, slaves waiting for the master to create the RDB file. 2249 * The newline will be ignored by the slave but will refresh the 2250 * last-io timer preventing a timeout. In this case we ignore the 2251 * ping period and refresh the connection once per second since certain 2252 * timeouts are set at a few seconds (example: PSYNC response). */ 2253 listRewind(server.slaves,&li); 2254 while((ln = listNext(&li))) { 2255 client *slave = ln->value; 2256 2257 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || 2258 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && 2259 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) 2260 { 2261 if (write(slave->fd, "\n", 1) == -1) { 2262 /* Don't worry, it's just a ping. */ 2263 } 2264 } 2265 } 2266 2267 /* Disconnect timedout slaves. */ 2268 if (listLength(server.slaves)) { 2269 listIter li; 2270 listNode *ln; 2271 2272 listRewind(server.slaves,&li); 2273 while((ln = listNext(&li))) { 2274 client *slave = ln->value; 2275 2276 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2277 if (slave->flags & CLIENT_PRE_PSYNC) continue; 2278 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2279 { 2280 serverLog(LL_WARNING, "Disconnecting timedout slave: %s", 2281 replicationGetSlaveName(slave)); 2282 freeClient(slave); 2283 } 2284 } 2285 } 2286 2287 /* If we have no attached slaves and there is a replication backlog 2288 * using memory, free it after some (configured) time. */ 2289 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2290 server.repl_backlog) 2291 { 2292 time_t idle = server.unixtime - server.repl_no_slaves_since; 2293 2294 if (idle > server.repl_backlog_time_limit) { 2295 freeReplicationBacklog(); 2296 serverLog(LL_NOTICE, 2297 "Replication backlog freed after %d seconds " 2298 "without connected slaves.", 2299 (int) server.repl_backlog_time_limit); 2300 } 2301 } 2302 2303 /* If AOF is disabled and we no longer have attached slaves, we can 2304 * free our Replication Script Cache as there is no need to propagate 2305 * EVALSHA at all. */ 2306 if (listLength(server.slaves) == 0 && 2307 server.aof_state == AOF_OFF && 2308 listLength(server.repl_scriptcache_fifo) != 0) 2309 { 2310 replicationScriptCacheFlush(); 2311 } 2312 2313 /* Start a BGSAVE good for replication if we have slaves in 2314 * WAIT_BGSAVE_START state. 2315 * 2316 * In case of diskless replication, we make sure to wait the specified 2317 * number of seconds (according to configuration) so that other slaves 2318 * have the time to arrive before we start streaming. */ 2319 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2320 time_t idle, max_idle = 0; 2321 int slaves_waiting = 0; 2322 int mincapa = -1; 2323 listNode *ln; 2324 listIter li; 2325 2326 listRewind(server.slaves,&li); 2327 while((ln = listNext(&li))) { 2328 client *slave = ln->value; 2329 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 2330 idle = server.unixtime - slave->lastinteraction; 2331 if (idle > max_idle) max_idle = idle; 2332 slaves_waiting++; 2333 mincapa = (mincapa == -1) ? slave->slave_capa : 2334 (mincapa & slave->slave_capa); 2335 } 2336 } 2337 2338 if (slaves_waiting && 2339 (!server.repl_diskless_sync || 2340 max_idle > server.repl_diskless_sync_delay)) 2341 { 2342 /* Start the BGSAVE. The called function may start a 2343 * BGSAVE with socket target or disk target depending on the 2344 * configuration and slaves capabilities. */ 2345 startBgsaveForReplication(mincapa); 2346 } 2347 } 2348 2349 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2350 refreshGoodSlavesCount(); 2351 replication_cron_loops++; /* Incremented with frequency 1 HZ. */ 2352 } 2353