1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "server.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 void putSlaveOnline(client *slave); 44 int cancelReplicationHandshake(void); 45 46 /* --------------------------- Utility functions ---------------------------- */ 47 48 /* Return the pointer to a string representing the slave ip:listening_port 49 * pair. Mostly useful for logging, since we want to log a slave using its 50 * IP address and it's listening port which is more clear for the user, for 51 * example: "Closing connection with slave 10.1.2.3:6380". */ 52 char *replicationGetSlaveName(client *c) { 53 static char buf[NET_PEER_ID_LEN]; 54 char ip[NET_IP_STR_LEN]; 55 56 ip[0] = '\0'; 57 buf[0] = '\0'; 58 if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { 59 if (c->slave_listening_port) 60 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); 61 else 62 snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip); 63 } else { 64 snprintf(buf,sizeof(buf),"client id #%llu", 65 (unsigned long long) c->id); 66 } 67 return buf; 68 } 69 70 /* ---------------------------------- MASTER -------------------------------- */ 71 72 void createReplicationBacklog(void) { 73 serverAssert(server.repl_backlog == NULL); 74 server.repl_backlog = zmalloc(server.repl_backlog_size); 75 server.repl_backlog_histlen = 0; 76 server.repl_backlog_idx = 0; 77 /* When a new backlog buffer is created, we increment the replication 78 * offset by one to make sure we'll not be able to PSYNC with any 79 * previous slave. This is needed because we avoid incrementing the 80 * master_repl_offset if no backlog exists nor slaves are attached. */ 81 server.master_repl_offset++; 82 83 /* We don't have any data inside our buffer, but virtually the first 84 * byte we have is the next byte that will be generated for the 85 * replication stream. */ 86 server.repl_backlog_off = server.master_repl_offset+1; 87 } 88 89 /* This function is called when the user modifies the replication backlog 90 * size at runtime. It is up to the function to both update the 91 * server.repl_backlog_size and to resize the buffer and setup it so that 92 * it contains the same data as the previous one (possibly less data, but 93 * the most recent bytes, or the same data and more free space in case the 94 * buffer is enlarged). */ 95 void resizeReplicationBacklog(long long newsize) { 96 if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) 97 newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; 98 if (server.repl_backlog_size == newsize) return; 99 100 server.repl_backlog_size = newsize; 101 if (server.repl_backlog != NULL) { 102 /* What we actually do is to flush the old buffer and realloc a new 103 * empty one. It will refill with new data incrementally. 104 * The reason is that copying a few gigabytes adds latency and even 105 * worse often we need to alloc additional space before freeing the 106 * old buffer. */ 107 zfree(server.repl_backlog); 108 server.repl_backlog = zmalloc(server.repl_backlog_size); 109 server.repl_backlog_histlen = 0; 110 server.repl_backlog_idx = 0; 111 /* Next byte we have is... the next since the buffer is empty. */ 112 server.repl_backlog_off = server.master_repl_offset+1; 113 } 114 } 115 116 void freeReplicationBacklog(void) { 117 serverAssert(listLength(server.slaves) == 0); 118 zfree(server.repl_backlog); 119 server.repl_backlog = NULL; 120 } 121 122 /* Add data to the replication backlog. 123 * This function also increments the global replication offset stored at 124 * server.master_repl_offset, because there is no case where we want to feed 125 * the backlog without incrementing the buffer. */ 126 void feedReplicationBacklog(void *ptr, size_t len) { 127 unsigned char *p = ptr; 128 129 server.master_repl_offset += len; 130 131 /* This is a circular buffer, so write as much data we can at every 132 * iteration and rewind the "idx" index if we reach the limit. */ 133 while(len) { 134 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 135 if (thislen > len) thislen = len; 136 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 137 server.repl_backlog_idx += thislen; 138 if (server.repl_backlog_idx == server.repl_backlog_size) 139 server.repl_backlog_idx = 0; 140 len -= thislen; 141 p += thislen; 142 server.repl_backlog_histlen += thislen; 143 } 144 if (server.repl_backlog_histlen > server.repl_backlog_size) 145 server.repl_backlog_histlen = server.repl_backlog_size; 146 /* Set the offset of the first byte we have in the backlog. */ 147 server.repl_backlog_off = server.master_repl_offset - 148 server.repl_backlog_histlen + 1; 149 } 150 151 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 152 * as input. */ 153 void feedReplicationBacklogWithObject(robj *o) { 154 char llstr[LONG_STR_SIZE]; 155 void *p; 156 size_t len; 157 158 if (o->encoding == OBJ_ENCODING_INT) { 159 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 160 p = llstr; 161 } else { 162 len = sdslen(o->ptr); 163 p = o->ptr; 164 } 165 feedReplicationBacklog(p,len); 166 } 167 168 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 169 listNode *ln; 170 listIter li; 171 int j, len; 172 char llstr[LONG_STR_SIZE]; 173 174 /* If there aren't slaves, and there is no backlog buffer to populate, 175 * we can return ASAP. */ 176 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 177 178 /* We can't have slaves attached and no backlog. */ 179 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 180 181 /* Send SELECT command to every slave if needed. */ 182 if (server.slaveseldb != dictid) { 183 robj *selectcmd; 184 185 /* For a few DBs we have pre-computed SELECT command. */ 186 if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { 187 selectcmd = shared.select[dictid]; 188 } else { 189 int dictid_len; 190 191 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 192 selectcmd = createObject(OBJ_STRING, 193 sdscatprintf(sdsempty(), 194 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 195 dictid_len, llstr)); 196 } 197 198 /* Add the SELECT command into the backlog. */ 199 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 200 201 /* Send it to slaves. */ 202 listRewind(slaves,&li); 203 while((ln = listNext(&li))) { 204 client *slave = ln->value; 205 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 206 addReply(slave,selectcmd); 207 } 208 209 if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) 210 decrRefCount(selectcmd); 211 } 212 server.slaveseldb = dictid; 213 214 /* Write the command to the replication backlog if any. */ 215 if (server.repl_backlog) { 216 char aux[LONG_STR_SIZE+3]; 217 218 /* Add the multi bulk reply length. */ 219 aux[0] = '*'; 220 len = ll2string(aux+1,sizeof(aux)-1,argc); 221 aux[len+1] = '\r'; 222 aux[len+2] = '\n'; 223 feedReplicationBacklog(aux,len+3); 224 225 for (j = 0; j < argc; j++) { 226 long objlen = stringObjectLen(argv[j]); 227 228 /* We need to feed the buffer with the object as a bulk reply 229 * not just as a plain string, so create the $..CRLF payload len 230 * and add the final CRLF */ 231 aux[0] = '$'; 232 len = ll2string(aux+1,sizeof(aux)-1,objlen); 233 aux[len+1] = '\r'; 234 aux[len+2] = '\n'; 235 feedReplicationBacklog(aux,len+3); 236 feedReplicationBacklogWithObject(argv[j]); 237 feedReplicationBacklog(aux+len+1,2); 238 } 239 } 240 241 /* Write the command to every slave. */ 242 listRewind(server.slaves,&li); 243 while((ln = listNext(&li))) { 244 client *slave = ln->value; 245 246 /* Don't feed slaves that are still waiting for BGSAVE to start */ 247 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 248 249 /* Feed slaves that are waiting for the initial SYNC (so these commands 250 * are queued in the output buffer until the initial SYNC completes), 251 * or are already in sync with the master. */ 252 253 /* Add the multi bulk length. */ 254 addReplyMultiBulkLen(slave,argc); 255 256 /* Finally any additional argument that was not stored inside the 257 * static buffer if any (from j to argc). */ 258 for (j = 0; j < argc; j++) 259 addReplyBulk(slave,argv[j]); 260 } 261 } 262 263 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { 264 listNode *ln; 265 listIter li; 266 int j; 267 sds cmdrepr = sdsnew("+"); 268 robj *cmdobj; 269 struct timeval tv; 270 271 gettimeofday(&tv,NULL); 272 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 273 if (c->flags & CLIENT_LUA) { 274 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 275 } else if (c->flags & CLIENT_UNIX_SOCKET) { 276 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 277 } else { 278 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 279 } 280 281 for (j = 0; j < argc; j++) { 282 if (argv[j]->encoding == OBJ_ENCODING_INT) { 283 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 284 } else { 285 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 286 sdslen(argv[j]->ptr)); 287 } 288 if (j != argc-1) 289 cmdrepr = sdscatlen(cmdrepr," ",1); 290 } 291 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 292 cmdobj = createObject(OBJ_STRING,cmdrepr); 293 294 listRewind(monitors,&li); 295 while((ln = listNext(&li))) { 296 client *monitor = ln->value; 297 addReply(monitor,cmdobj); 298 } 299 decrRefCount(cmdobj); 300 } 301 302 /* Feed the slave 'c' with the replication backlog starting from the 303 * specified 'offset' up to the end of the backlog. */ 304 long long addReplyReplicationBacklog(client *c, long long offset) { 305 long long j, skip, len; 306 307 serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 308 309 if (server.repl_backlog_histlen == 0) { 310 serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); 311 return 0; 312 } 313 314 serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", 315 server.repl_backlog_size); 316 serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", 317 server.repl_backlog_off); 318 serverLog(LL_DEBUG, "[PSYNC] History len: %lld", 319 server.repl_backlog_histlen); 320 serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", 321 server.repl_backlog_idx); 322 323 /* Compute the amount of bytes we need to discard. */ 324 skip = offset - server.repl_backlog_off; 325 serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); 326 327 /* Point j to the oldest byte, that is actaully our 328 * server.repl_backlog_off byte. */ 329 j = (server.repl_backlog_idx + 330 (server.repl_backlog_size-server.repl_backlog_histlen)) % 331 server.repl_backlog_size; 332 serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); 333 334 /* Discard the amount of data to seek to the specified 'offset'. */ 335 j = (j + skip) % server.repl_backlog_size; 336 337 /* Feed slave with data. Since it is a circular buffer we have to 338 * split the reply in two parts if we are cross-boundary. */ 339 len = server.repl_backlog_histlen - skip; 340 serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); 341 while(len) { 342 long long thislen = 343 ((server.repl_backlog_size - j) < len) ? 344 (server.repl_backlog_size - j) : len; 345 346 serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 347 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 348 len -= thislen; 349 j = 0; 350 } 351 return server.repl_backlog_histlen - skip; 352 } 353 354 /* Return the offset to provide as reply to the PSYNC command received 355 * from the slave. The returned value is only valid immediately after 356 * the BGSAVE process started and before executing any other command 357 * from clients. */ 358 long long getPsyncInitialOffset(void) { 359 long long psync_offset = server.master_repl_offset; 360 /* Add 1 to psync_offset if it the replication backlog does not exists 361 * as when it will be created later we'll increment the offset by one. */ 362 if (server.repl_backlog == NULL) psync_offset++; 363 return psync_offset; 364 } 365 366 /* Send a FULLRESYNC reply in the specific case of a full resynchronization, 367 * as a side effect setup the slave for a full sync in different ways: 368 * 369 * 1) Remember, into the slave client structure, the offset we sent 370 * here, so that if new slaves will later attach to the same 371 * background RDB saving process (by duplicating this client output 372 * buffer), we can get the right offset from this slave. 373 * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that 374 * we start accumulating differences from this point. 375 * 3) Force the replication stream to re-emit a SELECT statement so 376 * the new slave incremental differences will start selecting the 377 * right database number. 378 * 379 * Normally this function should be called immediately after a successful 380 * BGSAVE for replication was started, or when there is one already in 381 * progress that we attached our slave to. */ 382 int replicationSetupSlaveForFullResync(client *slave, long long offset) { 383 char buf[128]; 384 int buflen; 385 386 slave->psync_initial_offset = offset; 387 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; 388 /* We are going to accumulate the incremental changes for this 389 * slave as well. Set slaveseldb to -1 in order to force to re-emit 390 * a SLEECT statement in the replication stream. */ 391 server.slaveseldb = -1; 392 393 /* Don't send this reply to slaves that approached us with 394 * the old SYNC command. */ 395 if (!(slave->flags & CLIENT_PRE_PSYNC)) { 396 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 397 server.runid,offset); 398 if (write(slave->fd,buf,buflen) != buflen) { 399 freeClientAsync(slave); 400 return C_ERR; 401 } 402 } 403 return C_OK; 404 } 405 406 /* This function handles the PSYNC command from the point of view of a 407 * master receiving a request for partial resynchronization. 408 * 409 * On success return C_OK, otherwise C_ERR is returned and we proceed 410 * with the usual full resync. */ 411 int masterTryPartialResynchronization(client *c) { 412 long long psync_offset, psync_len; 413 char *master_runid = c->argv[1]->ptr; 414 char buf[128]; 415 int buflen; 416 417 /* Is the runid of this master the same advertised by the wannabe slave 418 * via PSYNC? If runid changed this master is a different instance and 419 * there is no way to continue. */ 420 if (strcasecmp(master_runid, server.runid)) { 421 /* Run id "?" is used by slaves that want to force a full resync. */ 422 if (master_runid[0] != '?') { 423 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " 424 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 425 master_runid, server.runid); 426 } else { 427 serverLog(LL_NOTICE,"Full resync requested by slave %s", 428 replicationGetSlaveName(c)); 429 } 430 goto need_full_resync; 431 } 432 433 /* We still have the data our slave is asking for? */ 434 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 435 C_OK) goto need_full_resync; 436 if (!server.repl_backlog || 437 psync_offset < server.repl_backlog_off || 438 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 439 { 440 serverLog(LL_NOTICE, 441 "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); 442 if (psync_offset > server.master_repl_offset) { 443 serverLog(LL_WARNING, 444 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 445 } 446 goto need_full_resync; 447 } 448 449 /* If we reached this point, we are able to perform a partial resync: 450 * 1) Set client state to make it a slave. 451 * 2) Inform the client we can continue with +CONTINUE 452 * 3) Send the backlog data (from the offset to the end) to the slave. */ 453 c->flags |= CLIENT_SLAVE; 454 c->replstate = SLAVE_STATE_ONLINE; 455 c->repl_ack_time = server.unixtime; 456 c->repl_put_online_on_ack = 0; 457 listAddNodeTail(server.slaves,c); 458 /* We can't use the connection buffers since they are used to accumulate 459 * new commands at this stage. But we are sure the socket send buffer is 460 * empty so this write will never fail actually. */ 461 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 462 if (write(c->fd,buf,buflen) != buflen) { 463 freeClientAsync(c); 464 return C_OK; 465 } 466 psync_len = addReplyReplicationBacklog(c,psync_offset); 467 serverLog(LL_NOTICE, 468 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 469 replicationGetSlaveName(c), 470 psync_len, psync_offset); 471 /* Note that we don't need to set the selected DB at server.slaveseldb 472 * to -1 to force the master to emit SELECT, since the slave already 473 * has this state from the previous connection with the master. */ 474 475 refreshGoodSlavesCount(); 476 return C_OK; /* The caller can return, no full resync needed. */ 477 478 need_full_resync: 479 /* We need a full resync for some reason... Note that we can't 480 * reply to PSYNC right now if a full SYNC is needed. The reply 481 * must include the master offset at the time the RDB file we transfer 482 * is generated, so we need to delay the reply to that moment. */ 483 return C_ERR; 484 } 485 486 /* Start a BGSAVE for replication goals, which is, selecting the disk or 487 * socket target depending on the configuration, and making sure that 488 * the script cache is flushed before to start. 489 * 490 * The mincapa argument is the bitwise AND among all the slaves capabilities 491 * of the slaves waiting for this BGSAVE, so represents the slave capabilities 492 * all the slaves support. Can be tested via SLAVE_CAPA_* macros. 493 * 494 * Side effects, other than starting a BGSAVE: 495 * 496 * 1) Handle the slaves in WAIT_START state, by preparing them for a full 497 * sync if the BGSAVE was succesfully started, or sending them an error 498 * and dropping them from the list of slaves. 499 * 500 * 2) Flush the Lua scripting script cache if the BGSAVE was actually 501 * started. 502 * 503 * Returns C_OK on success or C_ERR otherwise. */ 504 int startBgsaveForReplication(int mincapa) { 505 int retval; 506 int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); 507 listIter li; 508 listNode *ln; 509 510 serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", 511 socket_target ? "slaves sockets" : "disk"); 512 513 if (socket_target) 514 retval = rdbSaveToSlavesSockets(); 515 else 516 retval = rdbSaveBackground(server.rdb_filename); 517 518 /* If we failed to BGSAVE, remove the slaves waiting for a full 519 * resynchorinization from the list of salves, inform them with 520 * an error about what happened, close the connection ASAP. */ 521 if (retval == C_ERR) { 522 serverLog(LL_WARNING,"BGSAVE for replication failed"); 523 listRewind(server.slaves,&li); 524 while((ln = listNext(&li))) { 525 client *slave = ln->value; 526 527 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 528 slave->flags &= ~CLIENT_SLAVE; 529 listDelNode(server.slaves,ln); 530 addReplyError(slave, 531 "BGSAVE failed, replication can't continue"); 532 slave->flags |= CLIENT_CLOSE_AFTER_REPLY; 533 } 534 } 535 return retval; 536 } 537 538 /* If the target is socket, rdbSaveToSlavesSockets() already setup 539 * the salves for a full resync. Otherwise for disk target do it now.*/ 540 if (!socket_target) { 541 listRewind(server.slaves,&li); 542 while((ln = listNext(&li))) { 543 client *slave = ln->value; 544 545 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 546 replicationSetupSlaveForFullResync(slave, 547 getPsyncInitialOffset()); 548 } 549 } 550 } 551 552 /* Flush the script cache, since we need that slave differences are 553 * accumulated without requiring slaves to match our cached scripts. */ 554 if (retval == C_OK) replicationScriptCacheFlush(); 555 return retval; 556 } 557 558 /* SYNC and PSYNC command implemenation. */ 559 void syncCommand(client *c) { 560 /* ignore SYNC if already slave or in monitor mode */ 561 if (c->flags & CLIENT_SLAVE) return; 562 563 /* Refuse SYNC requests if we are a slave but the link with our master 564 * is not ok... */ 565 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { 566 addReplyError(c,"Can't SYNC while not connected with my master"); 567 return; 568 } 569 570 /* SYNC can't be issued when the server has pending data to send to 571 * the client about already issued commands. We need a fresh reply 572 * buffer registering the differences between the BGSAVE and the current 573 * dataset, so that we can copy to other slaves if needed. */ 574 if (clientHasPendingReplies(c)) { 575 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 576 return; 577 } 578 579 serverLog(LL_NOTICE,"Slave %s asks for synchronization", 580 replicationGetSlaveName(c)); 581 582 /* Try a partial resynchronization if this is a PSYNC command. 583 * If it fails, we continue with usual full resynchronization, however 584 * when this happens masterTryPartialResynchronization() already 585 * replied with: 586 * 587 * +FULLRESYNC <runid> <offset> 588 * 589 * So the slave knows the new runid and offset to try a PSYNC later 590 * if the connection with the master is lost. */ 591 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 592 if (masterTryPartialResynchronization(c) == C_OK) { 593 server.stat_sync_partial_ok++; 594 return; /* No full resync needed, return. */ 595 } else { 596 char *master_runid = c->argv[1]->ptr; 597 598 /* Increment stats for failed PSYNCs, but only if the 599 * runid is not "?", as this is used by slaves to force a full 600 * resync on purpose when they are not albe to partially 601 * resync. */ 602 if (master_runid[0] != '?') server.stat_sync_partial_err++; 603 } 604 } else { 605 /* If a slave uses SYNC, we are dealing with an old implementation 606 * of the replication protocol (like redis-cli --slave). Flag the client 607 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 608 c->flags |= CLIENT_PRE_PSYNC; 609 } 610 611 /* Full resynchronization. */ 612 server.stat_sync_full++; 613 614 /* Setup the slave as one waiting for BGSAVE to start. The following code 615 * paths will change the state if we handle the slave differently. */ 616 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; 617 if (server.repl_disable_tcp_nodelay) 618 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 619 c->repldbfd = -1; 620 c->flags |= CLIENT_SLAVE; 621 listAddNodeTail(server.slaves,c); 622 623 /* CASE 1: BGSAVE is in progress, with disk target. */ 624 if (server.rdb_child_pid != -1 && 625 server.rdb_child_type == RDB_CHILD_TYPE_DISK) 626 { 627 /* Ok a background save is in progress. Let's check if it is a good 628 * one for replication, i.e. if there is another slave that is 629 * registering differences since the server forked to save. */ 630 client *slave; 631 listNode *ln; 632 listIter li; 633 634 listRewind(server.slaves,&li); 635 while((ln = listNext(&li))) { 636 slave = ln->value; 637 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 638 } 639 /* To attach this slave, we check that it has at least all the 640 * capabilities of the slave that triggered the current BGSAVE. */ 641 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 642 /* Perfect, the server is already registering differences for 643 * another slave. Set the right state, and copy the buffer. */ 644 copyClientOutputBuffer(c,slave); 645 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); 646 serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); 647 } else { 648 /* No way, we need to wait for the next BGSAVE in order to 649 * register differences. */ 650 serverLog(LL_NOTICE,"Waiting for next BGSAVE for SYNC"); 651 } 652 653 /* CASE 2: BGSAVE is in progress, with socket target. */ 654 } else if (server.rdb_child_pid != -1 && 655 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) 656 { 657 /* There is an RDB child process but it is writing directly to 658 * children sockets. We need to wait for the next BGSAVE 659 * in order to synchronize. */ 660 serverLog(LL_NOTICE,"Waiting for next BGSAVE for SYNC"); 661 662 /* CASE 3: There is no BGSAVE is progress. */ 663 } else { 664 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { 665 /* Diskless replication RDB child is created inside 666 * replicationCron() since we want to delay its start a 667 * few seconds to wait for more slaves to arrive. */ 668 if (server.repl_diskless_sync_delay) 669 serverLog(LL_NOTICE,"Delay next BGSAVE for SYNC"); 670 } else { 671 /* Target is disk (or the slave is not capable of supporting 672 * diskless replication) and we don't have a BGSAVE in progress, 673 * let's start one. */ 674 if (startBgsaveForReplication(c->slave_capa) != C_OK) return; 675 } 676 } 677 678 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 679 createReplicationBacklog(); 680 return; 681 } 682 683 /* REPLCONF <option> <value> <option> <value> ... 684 * This command is used by a slave in order to configure the replication 685 * process before starting it with the SYNC command. 686 * 687 * Currently the only use of this command is to communicate to the master 688 * what is the listening port of the Slave redis instance, so that the 689 * master can accurately list slaves and their listening ports in 690 * the INFO output. 691 * 692 * In the future the same command can be used in order to configure 693 * the replication to initiate an incremental replication instead of a 694 * full resync. */ 695 void replconfCommand(client *c) { 696 int j; 697 698 if ((c->argc % 2) == 0) { 699 /* Number of arguments must be odd to make sure that every 700 * option has a corresponding value. */ 701 addReply(c,shared.syntaxerr); 702 return; 703 } 704 705 /* Process every option-value pair. */ 706 for (j = 1; j < c->argc; j+=2) { 707 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 708 long port; 709 710 if ((getLongFromObjectOrReply(c,c->argv[j+1], 711 &port,NULL) != C_OK)) 712 return; 713 c->slave_listening_port = port; 714 } else if (!strcasecmp(c->argv[j]->ptr,"capa")) { 715 /* Ignore capabilities not understood by this master. */ 716 if (!strcasecmp(c->argv[j+1]->ptr,"eof")) 717 c->slave_capa |= SLAVE_CAPA_EOF; 718 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 719 /* REPLCONF ACK is used by slave to inform the master the amount 720 * of replication stream that it processed so far. It is an 721 * internal only command that normal clients should never use. */ 722 long long offset; 723 724 if (!(c->flags & CLIENT_SLAVE)) return; 725 if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) 726 return; 727 if (offset > c->repl_ack_off) 728 c->repl_ack_off = offset; 729 c->repl_ack_time = server.unixtime; 730 /* If this was a diskless replication, we need to really put 731 * the slave online when the first ACK is received (which 732 * confirms slave is online and ready to get more data). */ 733 if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) 734 putSlaveOnline(c); 735 /* Note: this command does not reply anything! */ 736 return; 737 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 738 /* REPLCONF GETACK is used in order to request an ACK ASAP 739 * to the slave. */ 740 if (server.masterhost && server.master) replicationSendAck(); 741 /* Note: this command does not reply anything! */ 742 } else { 743 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 744 (char*)c->argv[j]->ptr); 745 return; 746 } 747 } 748 addReply(c,shared.ok); 749 } 750 751 /* This function puts a slave in the online state, and should be called just 752 * after a slave received the RDB file for the initial synchronization, and 753 * we are finally ready to send the incremental stream of commands. 754 * 755 * It does a few things: 756 * 757 * 1) Put the slave in ONLINE state (useless when the function is called 758 * because state is already ONLINE but repl_put_online_on_ack is true). 759 * 2) Make sure the writable event is re-installed, since calling the SYNC 760 * command disables it, so that we can accumulate output buffer without 761 * sending it to the slave. 762 * 3) Update the count of good slaves. */ 763 void putSlaveOnline(client *slave) { 764 slave->replstate = SLAVE_STATE_ONLINE; 765 slave->repl_put_online_on_ack = 0; 766 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ 767 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 768 sendReplyToClient, slave) == AE_ERR) { 769 serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 770 freeClient(slave); 771 return; 772 } 773 refreshGoodSlavesCount(); 774 serverLog(LL_NOTICE,"Synchronization with slave %s succeeded", 775 replicationGetSlaveName(slave)); 776 } 777 778 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 779 client *slave = privdata; 780 UNUSED(el); 781 UNUSED(mask); 782 char buf[PROTO_IOBUF_LEN]; 783 ssize_t nwritten, buflen; 784 785 /* Before sending the RDB file, we send the preamble as configured by the 786 * replication process. Currently the preamble is just the bulk count of 787 * the file in the form "$<length>\r\n". */ 788 if (slave->replpreamble) { 789 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 790 if (nwritten == -1) { 791 serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s", 792 strerror(errno)); 793 freeClient(slave); 794 return; 795 } 796 server.stat_net_output_bytes += nwritten; 797 sdsrange(slave->replpreamble,nwritten,-1); 798 if (sdslen(slave->replpreamble) == 0) { 799 sdsfree(slave->replpreamble); 800 slave->replpreamble = NULL; 801 /* fall through sending data. */ 802 } else { 803 return; 804 } 805 } 806 807 /* If the preamble was already transfered, send the RDB bulk data. */ 808 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 809 buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); 810 if (buflen <= 0) { 811 serverLog(LL_WARNING,"Read error sending DB to slave: %s", 812 (buflen == 0) ? "premature EOF" : strerror(errno)); 813 freeClient(slave); 814 return; 815 } 816 if ((nwritten = write(fd,buf,buflen)) == -1) { 817 if (errno != EAGAIN) { 818 serverLog(LL_WARNING,"Write error sending DB to slave: %s", 819 strerror(errno)); 820 freeClient(slave); 821 } 822 return; 823 } 824 slave->repldboff += nwritten; 825 server.stat_net_output_bytes += nwritten; 826 if (slave->repldboff == slave->repldbsize) { 827 close(slave->repldbfd); 828 slave->repldbfd = -1; 829 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 830 putSlaveOnline(slave); 831 } 832 } 833 834 /* This function is called at the end of every background saving, 835 * or when the replication RDB transfer strategy is modified from 836 * disk to socket or the other way around. 837 * 838 * The goal of this function is to handle slaves waiting for a successful 839 * background saving in order to perform non-blocking synchronization, and 840 * to schedule a new BGSAVE if there are slaves that attached while a 841 * BGSAVE was in progress, but it was not a good one for replication (no 842 * other slave was accumulating differences). 843 * 844 * The argument bgsaveerr is C_OK if the background saving succeeded 845 * otherwise C_ERR is passed to the function. 846 * The 'type' argument is the type of the child that terminated 847 * (if it had a disk or socket target). */ 848 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 849 listNode *ln; 850 int startbgsave = 0; 851 int mincapa = -1; 852 listIter li; 853 854 listRewind(server.slaves,&li); 855 while((ln = listNext(&li))) { 856 client *slave = ln->value; 857 858 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 859 startbgsave = 1; 860 mincapa = (mincapa == -1) ? slave->slave_capa : 861 (mincapa & slave->slave_capa); 862 } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { 863 struct redis_stat buf; 864 865 /* If this was an RDB on disk save, we have to prepare to send 866 * the RDB from disk to the slave socket. Otherwise if this was 867 * already an RDB -> Slaves socket transfer, used in the case of 868 * diskless replication, our work is trivial, we can just put 869 * the slave online. */ 870 if (type == RDB_CHILD_TYPE_SOCKET) { 871 serverLog(LL_NOTICE, 872 "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 873 replicationGetSlaveName(slave)); 874 /* Note: we wait for a REPLCONF ACK message from slave in 875 * order to really put it online (install the write handler 876 * so that the accumulated data can be transfered). However 877 * we change the replication state ASAP, since our slave 878 * is technically online now. */ 879 slave->replstate = SLAVE_STATE_ONLINE; 880 slave->repl_put_online_on_ack = 1; 881 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ 882 } else { 883 if (bgsaveerr != C_OK) { 884 freeClient(slave); 885 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); 886 continue; 887 } 888 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 889 redis_fstat(slave->repldbfd,&buf) == -1) { 890 freeClient(slave); 891 serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 892 continue; 893 } 894 slave->repldboff = 0; 895 slave->repldbsize = buf.st_size; 896 slave->replstate = SLAVE_STATE_SEND_BULK; 897 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 898 (unsigned long long) slave->repldbsize); 899 900 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 901 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 902 freeClient(slave); 903 continue; 904 } 905 } 906 } 907 } 908 if (startbgsave) startBgsaveForReplication(mincapa); 909 } 910 911 /* ----------------------------------- SLAVE -------------------------------- */ 912 913 /* Returns 1 if the given replication state is a handshake state, 914 * 0 otherwise. */ 915 int slaveIsInHandshakeState(void) { 916 return server.repl_state >= REPL_STATE_RECEIVE_PONG && 917 server.repl_state <= REPL_STATE_RECEIVE_PSYNC; 918 } 919 920 /* Avoid the master to detect the slave is timing out while loading the 921 * RDB file in initial synchronization. We send a single newline character 922 * that is valid protocol but is guaranteed to either be sent entierly or 923 * not, since the byte is indivisible. 924 * 925 * The function is called in two contexts: while we flush the current 926 * data with emptyDb(), and while we load the new data received as an 927 * RDB file from the master. */ 928 void replicationSendNewlineToMaster(void) { 929 static time_t newline_sent; 930 if (time(NULL) != newline_sent) { 931 newline_sent = time(NULL); 932 if (write(server.repl_transfer_s,"\n",1) == -1) { 933 /* Pinging back in this stage is best-effort. */ 934 } 935 } 936 } 937 938 /* Callback used by emptyDb() while flushing away old data to load 939 * the new dataset received by the master. */ 940 void replicationEmptyDbCallback(void *privdata) { 941 UNUSED(privdata); 942 replicationSendNewlineToMaster(); 943 } 944 945 /* Once we have a link with the master and the synchroniziation was 946 * performed, this function materializes the master client we store 947 * at server.master, starting from the specified file descriptor. */ 948 void replicationCreateMasterClient(int fd) { 949 server.master = createClient(fd); 950 server.master->flags |= CLIENT_MASTER; 951 server.master->authenticated = 1; 952 server.repl_state = REPL_STATE_CONNECTED; 953 server.master->reploff = server.repl_master_initial_offset; 954 memcpy(server.master->replrunid, server.repl_master_runid, 955 sizeof(server.repl_master_runid)); 956 /* If master offset is set to -1, this master is old and is not 957 * PSYNC capable, so we flag it accordingly. */ 958 if (server.master->reploff == -1) 959 server.master->flags |= CLIENT_PRE_PSYNC; 960 } 961 962 /* Asynchronously read the SYNC payload we receive from a master */ 963 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 964 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 965 char buf[4096]; 966 ssize_t nread, readlen; 967 off_t left; 968 UNUSED(el); 969 UNUSED(privdata); 970 UNUSED(mask); 971 972 /* Static vars used to hold the EOF mark, and the last bytes received 973 * form the server: when they match, we reached the end of the transfer. */ 974 static char eofmark[CONFIG_RUN_ID_SIZE]; 975 static char lastbytes[CONFIG_RUN_ID_SIZE]; 976 static int usemark = 0; 977 978 /* If repl_transfer_size == -1 we still have to read the bulk length 979 * from the master reply. */ 980 if (server.repl_transfer_size == -1) { 981 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 982 serverLog(LL_WARNING, 983 "I/O error reading bulk count from MASTER: %s", 984 strerror(errno)); 985 goto error; 986 } 987 988 if (buf[0] == '-') { 989 serverLog(LL_WARNING, 990 "MASTER aborted replication with an error: %s", 991 buf+1); 992 goto error; 993 } else if (buf[0] == '\0') { 994 /* At this stage just a newline works as a PING in order to take 995 * the connection live. So we refresh our last interaction 996 * timestamp. */ 997 server.repl_transfer_lastio = server.unixtime; 998 return; 999 } else if (buf[0] != '$') { 1000 serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 1001 goto error; 1002 } 1003 1004 /* There are two possible forms for the bulk payload. One is the 1005 * usual $<count> bulk format. The other is used for diskless transfers 1006 * when the master does not know beforehand the size of the file to 1007 * transfer. In the latter case, the following format is used: 1008 * 1009 * $EOF:<40 bytes delimiter> 1010 * 1011 * At the end of the file the announced delimiter is transmitted. The 1012 * delimiter is long and random enough that the probability of a 1013 * collision with the actual file content can be ignored. */ 1014 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { 1015 usemark = 1; 1016 memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); 1017 memset(lastbytes,0,CONFIG_RUN_ID_SIZE); 1018 /* Set any repl_transfer_size to avoid entering this code path 1019 * at the next call. */ 1020 server.repl_transfer_size = 0; 1021 serverLog(LL_NOTICE, 1022 "MASTER <-> SLAVE sync: receiving streamed RDB from master"); 1023 } else { 1024 usemark = 0; 1025 server.repl_transfer_size = strtol(buf+1,NULL,10); 1026 serverLog(LL_NOTICE, 1027 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 1028 (long long) server.repl_transfer_size); 1029 } 1030 return; 1031 } 1032 1033 /* Read bulk data */ 1034 if (usemark) { 1035 readlen = sizeof(buf); 1036 } else { 1037 left = server.repl_transfer_size - server.repl_transfer_read; 1038 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 1039 } 1040 1041 nread = read(fd,buf,readlen); 1042 if (nread <= 0) { 1043 serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", 1044 (nread == -1) ? strerror(errno) : "connection lost"); 1045 cancelReplicationHandshake(); 1046 return; 1047 } 1048 server.stat_net_input_bytes += nread; 1049 1050 /* When a mark is used, we want to detect EOF asap in order to avoid 1051 * writing the EOF mark into the file... */ 1052 int eof_reached = 0; 1053 1054 if (usemark) { 1055 /* Update the last bytes array, and check if it matches our delimiter.*/ 1056 if (nread >= CONFIG_RUN_ID_SIZE) { 1057 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); 1058 } else { 1059 int rem = CONFIG_RUN_ID_SIZE-nread; 1060 memmove(lastbytes,lastbytes+nread,rem); 1061 memcpy(lastbytes+rem,buf,nread); 1062 } 1063 if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; 1064 } 1065 1066 server.repl_transfer_lastio = server.unixtime; 1067 if (write(server.repl_transfer_fd,buf,nread) != nread) { 1068 serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1069 goto error; 1070 } 1071 server.repl_transfer_read += nread; 1072 1073 /* Delete the last 40 bytes from the file if we reached EOF. */ 1074 if (usemark && eof_reached) { 1075 if (ftruncate(server.repl_transfer_fd, 1076 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) 1077 { 1078 serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 1079 goto error; 1080 } 1081 } 1082 1083 /* Sync data on disk from time to time, otherwise at the end of the transfer 1084 * we may suffer a big delay as the memory buffers are copied into the 1085 * actual disk. */ 1086 if (server.repl_transfer_read >= 1087 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 1088 { 1089 off_t sync_size = server.repl_transfer_read - 1090 server.repl_transfer_last_fsync_off; 1091 rdb_fsync_range(server.repl_transfer_fd, 1092 server.repl_transfer_last_fsync_off, sync_size); 1093 server.repl_transfer_last_fsync_off += sync_size; 1094 } 1095 1096 /* Check if the transfer is now complete */ 1097 if (!usemark) { 1098 if (server.repl_transfer_read == server.repl_transfer_size) 1099 eof_reached = 1; 1100 } 1101 1102 if (eof_reached) { 1103 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 1104 serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1105 cancelReplicationHandshake(); 1106 return; 1107 } 1108 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 1109 signalFlushedDb(-1); 1110 emptyDb(replicationEmptyDbCallback); 1111 /* Before loading the DB into memory we need to delete the readable 1112 * handler, otherwise it will get called recursively since 1113 * rdbLoad() will call the event loop to process events from time to 1114 * time for non blocking loading. */ 1115 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1116 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 1117 if (rdbLoad(server.rdb_filename) != C_OK) { 1118 serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1119 cancelReplicationHandshake(); 1120 return; 1121 } 1122 /* Final setup of the connected slave <- master link */ 1123 zfree(server.repl_transfer_tmpfile); 1124 close(server.repl_transfer_fd); 1125 replicationCreateMasterClient(server.repl_transfer_s); 1126 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 1127 /* Restart the AOF subsystem now that we finished the sync. This 1128 * will trigger an AOF rewrite, and when done will start appending 1129 * to the new file. */ 1130 if (server.aof_state != AOF_OFF) { 1131 int retry = 10; 1132 1133 stopAppendOnly(); 1134 while (retry-- && startAppendOnly() == C_ERR) { 1135 serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 1136 sleep(1); 1137 } 1138 if (!retry) { 1139 serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 1140 exit(1); 1141 } 1142 } 1143 } 1144 1145 return; 1146 1147 error: 1148 cancelReplicationHandshake(); 1149 return; 1150 } 1151 1152 /* Send a synchronous command to the master. Used to send AUTH and 1153 * REPLCONF commands before starting the replication with SYNC. 1154 * 1155 * The command returns an sds string representing the result of the 1156 * operation. On error the first byte is a "-". 1157 */ 1158 #define SYNC_CMD_READ (1<<0) 1159 #define SYNC_CMD_WRITE (1<<1) 1160 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE) 1161 char *sendSynchronousCommand(int flags, int fd, ...) { 1162 1163 /* Create the command to send to the master, we use simple inline 1164 * protocol for simplicity as currently we only send simple strings. */ 1165 if (flags & SYNC_CMD_WRITE) { 1166 char *arg; 1167 va_list ap; 1168 sds cmd = sdsempty(); 1169 va_start(ap,fd); 1170 1171 while(1) { 1172 arg = va_arg(ap, char*); 1173 if (arg == NULL) break; 1174 1175 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 1176 cmd = sdscat(cmd,arg); 1177 } 1178 cmd = sdscatlen(cmd,"\r\n",2); 1179 1180 /* Transfer command to the server. */ 1181 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) 1182 == -1) 1183 { 1184 sdsfree(cmd); 1185 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1186 strerror(errno)); 1187 } 1188 sdsfree(cmd); 1189 va_end(ap); 1190 } 1191 1192 /* Read the reply from the server. */ 1193 if (flags & SYNC_CMD_READ) { 1194 char buf[256]; 1195 1196 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) 1197 == -1) 1198 { 1199 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1200 strerror(errno)); 1201 } 1202 server.repl_transfer_lastio = server.unixtime; 1203 return sdsnew(buf); 1204 } 1205 return NULL; 1206 } 1207 1208 /* Try a partial resynchronization with the master if we are about to reconnect. 1209 * If there is no cached master structure, at least try to issue a 1210 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1211 * command in order to obtain the master run id and the master replication 1212 * global offset. 1213 * 1214 * This function is designed to be called from syncWithMaster(), so the 1215 * following assumptions are made: 1216 * 1217 * 1) We pass the function an already connected socket "fd". 1218 * 2) This function does not close the file descriptor "fd". However in case 1219 * of successful partial resynchronization, the function will reuse 1220 * 'fd' as file descriptor of the server.master client structure. 1221 * 1222 * The function is split in two halves: if read_reply is 0, the function 1223 * writes the PSYNC command on the socket, and a new function call is 1224 * needed, with read_reply set to 1, in order to read the reply of the 1225 * command. This is useful in order to support non blocking operations, so 1226 * that we write, return into the event loop, and read when there are data. 1227 * 1228 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there 1229 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call 1230 * with read_reply set to 1. However even when read_reply is set to 1 1231 * the function may return PSYNC_WAIT_REPLY again to signal there were 1232 * insufficient data to read to complete its work. We should re-enter 1233 * into the event loop and wait in such a case. 1234 * 1235 * The function returns: 1236 * 1237 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 1238 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1239 * In this case the master run_id and global replication 1240 * offset is saved. 1241 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1242 * the caller should fall back to SYNC. 1243 * PSYNC_WRITE_ERR: There was an error writing the command to the socket. 1244 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. 1245 * 1246 * Notable side effects: 1247 * 1248 * 1) As a side effect of the function call the function removes the readable 1249 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. 1250 * 2) server.repl_master_initial_offset is set to the right value according 1251 * to the master reply. This will be used to populate the 'server.master' 1252 * structure replication offset. 1253 */ 1254 1255 #define PSYNC_WRITE_ERROR 0 1256 #define PSYNC_WAIT_REPLY 1 1257 #define PSYNC_CONTINUE 2 1258 #define PSYNC_FULLRESYNC 3 1259 #define PSYNC_NOT_SUPPORTED 4 1260 int slaveTryPartialResynchronization(int fd, int read_reply) { 1261 char *psync_runid; 1262 char psync_offset[32]; 1263 sds reply; 1264 1265 /* Writing half */ 1266 if (!read_reply) { 1267 /* Initially set repl_master_initial_offset to -1 to mark the current 1268 * master run_id and offset as not valid. Later if we'll be able to do 1269 * a FULL resync using the PSYNC command we'll set the offset at the 1270 * right value, so that this information will be propagated to the 1271 * client structure representing the master into server.master. */ 1272 server.repl_master_initial_offset = -1; 1273 1274 if (server.cached_master) { 1275 psync_runid = server.cached_master->replrunid; 1276 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1277 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 1278 } else { 1279 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); 1280 psync_runid = "?"; 1281 memcpy(psync_offset,"-1",3); 1282 } 1283 1284 /* Issue the PSYNC command */ 1285 reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); 1286 if (reply != NULL) { 1287 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); 1288 sdsfree(reply); 1289 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1290 return PSYNC_WRITE_ERROR; 1291 } 1292 return PSYNC_WAIT_REPLY; 1293 } 1294 1295 /* Reading half */ 1296 reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1297 if (sdslen(reply) == 0) { 1298 /* The master may send empty newlines after it receives PSYNC 1299 * and before to reply, just to keep the connection alive. */ 1300 sdsfree(reply); 1301 return PSYNC_WAIT_REPLY; 1302 } 1303 1304 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1305 1306 if (!strncmp(reply,"+FULLRESYNC",11)) { 1307 char *runid = NULL, *offset = NULL; 1308 1309 /* FULL RESYNC, parse the reply in order to extract the run id 1310 * and the replication offset. */ 1311 runid = strchr(reply,' '); 1312 if (runid) { 1313 runid++; 1314 offset = strchr(runid,' '); 1315 if (offset) offset++; 1316 } 1317 if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { 1318 serverLog(LL_WARNING, 1319 "Master replied with wrong +FULLRESYNC syntax."); 1320 /* This is an unexpected condition, actually the +FULLRESYNC 1321 * reply means that the master supports PSYNC, but the reply 1322 * format seems wrong. To stay safe we blank the master 1323 * runid to make sure next PSYNCs will fail. */ 1324 memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); 1325 } else { 1326 memcpy(server.repl_master_runid, runid, offset-runid-1); 1327 server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; 1328 server.repl_master_initial_offset = strtoll(offset,NULL,10); 1329 serverLog(LL_NOTICE,"Full resync from master: %s:%lld", 1330 server.repl_master_runid, 1331 server.repl_master_initial_offset); 1332 } 1333 /* We are going to full resync, discard the cached master structure. */ 1334 replicationDiscardCachedMaster(); 1335 sdsfree(reply); 1336 return PSYNC_FULLRESYNC; 1337 } 1338 1339 if (!strncmp(reply,"+CONTINUE",9)) { 1340 /* Partial resync was accepted, set the replication state accordingly */ 1341 serverLog(LL_NOTICE, 1342 "Successful partial resynchronization with master."); 1343 sdsfree(reply); 1344 replicationResurrectCachedMaster(fd); 1345 return PSYNC_CONTINUE; 1346 } 1347 1348 /* If we reach this point we received either an error since the master does 1349 * not understand PSYNC, or an unexpected reply from the master. 1350 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1351 1352 if (strncmp(reply,"-ERR",4)) { 1353 /* If it's not an error, log the unexpected event. */ 1354 serverLog(LL_WARNING, 1355 "Unexpected reply to PSYNC from master: %s", reply); 1356 } else { 1357 serverLog(LL_NOTICE, 1358 "Master does not support PSYNC or is in " 1359 "error state (reply: %s)", reply); 1360 } 1361 sdsfree(reply); 1362 replicationDiscardCachedMaster(); 1363 return PSYNC_NOT_SUPPORTED; 1364 } 1365 1366 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1367 char tmpfile[256], *err = NULL; 1368 int dfd, maxtries = 5; 1369 int sockerr = 0, psync_result; 1370 socklen_t errlen = sizeof(sockerr); 1371 UNUSED(el); 1372 UNUSED(privdata); 1373 UNUSED(mask); 1374 1375 /* If this event fired after the user turned the instance into a master 1376 * with SLAVEOF NO ONE we must just return ASAP. */ 1377 if (server.repl_state == REPL_STATE_NONE) { 1378 close(fd); 1379 return; 1380 } 1381 1382 /* Check for errors in the socket. */ 1383 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1384 sockerr = errno; 1385 if (sockerr) { 1386 serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", 1387 strerror(sockerr)); 1388 goto error; 1389 } 1390 1391 /* Send a PING to check the master is able to reply without errors. */ 1392 if (server.repl_state == REPL_STATE_CONNECTING) { 1393 serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); 1394 /* Delete the writable event so that the readable event remains 1395 * registered and we can wait for the PONG reply. */ 1396 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1397 server.repl_state = REPL_STATE_RECEIVE_PONG; 1398 /* Send the PING, don't check for errors at all, we have the timeout 1399 * that will take care about this. */ 1400 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); 1401 if (err) goto write_error; 1402 return; 1403 } 1404 1405 /* Receive the PONG command. */ 1406 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { 1407 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1408 1409 /* We accept only two replies as valid, a positive +PONG reply 1410 * (we just check for "+") or an authentication error. 1411 * Note that older versions of Redis replied with "operation not 1412 * permitted" instead of using a proper error code, so we test 1413 * both. */ 1414 if (err[0] != '+' && 1415 strncmp(err,"-NOAUTH",7) != 0 && 1416 strncmp(err,"-ERR operation not permitted",28) != 0) 1417 { 1418 serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); 1419 sdsfree(err); 1420 goto error; 1421 } else { 1422 serverLog(LL_NOTICE, 1423 "Master replied to PING, replication can continue..."); 1424 } 1425 sdsfree(err); 1426 server.repl_state = REPL_STATE_SEND_AUTH; 1427 } 1428 1429 /* AUTH with the master if required. */ 1430 if (server.repl_state == REPL_STATE_SEND_AUTH) { 1431 if (server.masterauth) { 1432 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); 1433 if (err) goto write_error; 1434 server.repl_state = REPL_STATE_RECEIVE_AUTH; 1435 return; 1436 } else { 1437 server.repl_state = REPL_STATE_SEND_PORT; 1438 } 1439 } 1440 1441 /* Receive AUTH reply. */ 1442 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { 1443 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1444 if (err[0] == '-') { 1445 serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); 1446 sdsfree(err); 1447 goto error; 1448 } 1449 sdsfree(err); 1450 server.repl_state = REPL_STATE_SEND_PORT; 1451 } 1452 1453 /* Set the slave port, so that Master's INFO command can list the 1454 * slave listening port correctly. */ 1455 if (server.repl_state == REPL_STATE_SEND_PORT) { 1456 sds port = sdsfromlonglong(server.port); 1457 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1458 "listening-port",port, NULL); 1459 sdsfree(port); 1460 if (err) goto write_error; 1461 sdsfree(err); 1462 server.repl_state = REPL_STATE_RECEIVE_PORT; 1463 return; 1464 } 1465 1466 /* Receive REPLCONF listening-port reply. */ 1467 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { 1468 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1469 /* Ignore the error if any, not all the Redis versions support 1470 * REPLCONF listening-port. */ 1471 if (err[0] == '-') { 1472 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1473 "REPLCONF listening-port: %s", err); 1474 } 1475 sdsfree(err); 1476 server.repl_state = REPL_STATE_SEND_CAPA; 1477 } 1478 1479 /* Inform the master of our capabilities. While we currently send 1480 * just one capability, it is possible to chain new capabilities here 1481 * in the form of REPLCONF capa X capa Y capa Z ... 1482 * The master will ignore capabilities it does not understand. */ 1483 if (server.repl_state == REPL_STATE_SEND_CAPA) { 1484 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1485 "capa","eof",NULL); 1486 if (err) goto write_error; 1487 sdsfree(err); 1488 server.repl_state = REPL_STATE_RECEIVE_CAPA; 1489 return; 1490 } 1491 1492 /* Receive CAPA reply. */ 1493 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { 1494 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1495 /* Ignore the error if any, not all the Redis versions support 1496 * REPLCONF capa. */ 1497 if (err[0] == '-') { 1498 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1499 "REPLCONF capa: %s", err); 1500 } 1501 sdsfree(err); 1502 server.repl_state = REPL_STATE_SEND_PSYNC; 1503 } 1504 1505 /* Try a partial resynchonization. If we don't have a cached master 1506 * slaveTryPartialResynchronization() will at least try to use PSYNC 1507 * to start a full resynchronization so that we get the master run id 1508 * and the global offset, to try a partial resync at the next 1509 * reconnection attempt. */ 1510 if (server.repl_state == REPL_STATE_SEND_PSYNC) { 1511 if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { 1512 err = sdsnew("Write error sending the PSYNC command."); 1513 goto write_error; 1514 } 1515 server.repl_state = REPL_STATE_RECEIVE_PSYNC; 1516 return; 1517 } 1518 1519 /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ 1520 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { 1521 serverLog(LL_WARNING,"syncWithMaster(): state machine error, " 1522 "state should be RECEIVE_PSYNC but is %d", 1523 server.repl_state); 1524 goto error; 1525 } 1526 1527 psync_result = slaveTryPartialResynchronization(fd,1); 1528 if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ 1529 1530 /* Note: if PSYNC does not return WAIT_REPLY, it will take care of 1531 * uninstalling the read handler from the file descriptor. */ 1532 1533 if (psync_result == PSYNC_CONTINUE) { 1534 serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1535 return; 1536 } 1537 1538 /* PSYNC failed or is not supported: we want our slaves to resync with us 1539 * as well, if we have any (chained replication case). The mater may 1540 * transfer us an entirely different data set and we have no way to 1541 * incrementally feed our slaves after that. */ 1542 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1543 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1544 1545 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1546 * and the server.repl_master_runid and repl_master_initial_offset are 1547 * already populated. */ 1548 if (psync_result == PSYNC_NOT_SUPPORTED) { 1549 serverLog(LL_NOTICE,"Retrying with SYNC..."); 1550 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1551 serverLog(LL_WARNING,"I/O error writing to MASTER: %s", 1552 strerror(errno)); 1553 goto error; 1554 } 1555 } 1556 1557 /* Prepare a suitable temp file for bulk transfer */ 1558 while(maxtries--) { 1559 snprintf(tmpfile,256, 1560 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1561 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1562 if (dfd != -1) break; 1563 sleep(1); 1564 } 1565 if (dfd == -1) { 1566 serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1567 goto error; 1568 } 1569 1570 /* Setup the non blocking download of the bulk file. */ 1571 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1572 == AE_ERR) 1573 { 1574 serverLog(LL_WARNING, 1575 "Can't create readable event for SYNC: %s (fd=%d)", 1576 strerror(errno),fd); 1577 goto error; 1578 } 1579 1580 server.repl_state = REPL_STATE_TRANSFER; 1581 server.repl_transfer_size = -1; 1582 server.repl_transfer_read = 0; 1583 server.repl_transfer_last_fsync_off = 0; 1584 server.repl_transfer_fd = dfd; 1585 server.repl_transfer_lastio = server.unixtime; 1586 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1587 return; 1588 1589 error: 1590 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1591 close(fd); 1592 server.repl_transfer_s = -1; 1593 server.repl_state = REPL_STATE_CONNECT; 1594 return; 1595 1596 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ 1597 serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); 1598 sdsfree(err); 1599 goto error; 1600 } 1601 1602 int connectWithMaster(void) { 1603 int fd; 1604 1605 fd = anetTcpNonBlockBestEffortBindConnect(NULL, 1606 server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); 1607 if (fd == -1) { 1608 serverLog(LL_WARNING,"Unable to connect to MASTER: %s", 1609 strerror(errno)); 1610 return C_ERR; 1611 } 1612 1613 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1614 AE_ERR) 1615 { 1616 close(fd); 1617 serverLog(LL_WARNING,"Can't create readable event for SYNC"); 1618 return C_ERR; 1619 } 1620 1621 server.repl_transfer_lastio = server.unixtime; 1622 server.repl_transfer_s = fd; 1623 server.repl_state = REPL_STATE_CONNECTING; 1624 return C_OK; 1625 } 1626 1627 /* This function can be called when a non blocking connection is currently 1628 * in progress to undo it. 1629 * Never call this function directly, use cancelReplicationHandshake() instead. 1630 */ 1631 void undoConnectWithMaster(void) { 1632 int fd = server.repl_transfer_s; 1633 1634 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1635 close(fd); 1636 server.repl_transfer_s = -1; 1637 } 1638 1639 /* Abort the async download of the bulk dataset while SYNC-ing with master. 1640 * Never call this function directly, use cancelReplicationHandshake() instead. 1641 */ 1642 void replicationAbortSyncTransfer(void) { 1643 serverAssert(server.repl_state == REPL_STATE_TRANSFER); 1644 undoConnectWithMaster(); 1645 close(server.repl_transfer_fd); 1646 unlink(server.repl_transfer_tmpfile); 1647 zfree(server.repl_transfer_tmpfile); 1648 } 1649 1650 /* This function aborts a non blocking replication attempt if there is one 1651 * in progress, by canceling the non-blocking connect attempt or 1652 * the initial bulk transfer. 1653 * 1654 * If there was a replication handshake in progress 1 is returned and 1655 * the replication state (server.repl_state) set to REPL_STATE_CONNECT. 1656 * 1657 * Otherwise zero is returned and no operation is perforemd at all. */ 1658 int cancelReplicationHandshake(void) { 1659 if (server.repl_state == REPL_STATE_TRANSFER) { 1660 replicationAbortSyncTransfer(); 1661 server.repl_state = REPL_STATE_CONNECT; 1662 } else if (server.repl_state == REPL_STATE_CONNECTING || 1663 slaveIsInHandshakeState()) 1664 { 1665 undoConnectWithMaster(); 1666 server.repl_state = REPL_STATE_CONNECT; 1667 } else { 1668 return 0; 1669 } 1670 return 1; 1671 } 1672 1673 /* Set replication to the specified master address and port. */ 1674 void replicationSetMaster(char *ip, int port) { 1675 sdsfree(server.masterhost); 1676 server.masterhost = sdsnew(ip); 1677 server.masterport = port; 1678 if (server.master) freeClient(server.master); 1679 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ 1680 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1681 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1682 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1683 cancelReplicationHandshake(); 1684 server.repl_state = REPL_STATE_CONNECT; 1685 server.master_repl_offset = 0; 1686 server.repl_down_since = 0; 1687 } 1688 1689 /* Cancel replication, setting the instance as a master itself. */ 1690 void replicationUnsetMaster(void) { 1691 if (server.masterhost == NULL) return; /* Nothing to do. */ 1692 sdsfree(server.masterhost); 1693 server.masterhost = NULL; 1694 if (server.master) { 1695 if (listLength(server.slaves) == 0) { 1696 /* If this instance is turned into a master and there are no 1697 * slaves, it inherits the replication offset from the master. 1698 * Under certain conditions this makes replicas comparable by 1699 * replication offset to understand what is the most updated. */ 1700 server.master_repl_offset = server.master->reploff; 1701 freeReplicationBacklog(); 1702 } 1703 freeClient(server.master); 1704 } 1705 replicationDiscardCachedMaster(); 1706 cancelReplicationHandshake(); 1707 server.repl_state = REPL_STATE_NONE; 1708 } 1709 1710 /* This function is called when the slave lose the connection with the 1711 * master into an unexpected way. */ 1712 void replicationHandleMasterDisconnection(void) { 1713 server.master = NULL; 1714 server.repl_state = REPL_STATE_CONNECT; 1715 server.repl_down_since = server.unixtime; 1716 /* We lost connection with our master, don't disconnect slaves yet, 1717 * maybe we'll be able to PSYNC with our master later. We'll disconnect 1718 * the slaves only if we'll have to do a full resync with our master. */ 1719 } 1720 1721 void slaveofCommand(client *c) { 1722 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1723 * configured using the current address of the master node. */ 1724 if (server.cluster_enabled) { 1725 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1726 return; 1727 } 1728 1729 /* The special host/port combination "NO" "ONE" turns the instance 1730 * into a master. Otherwise the new master address is set. */ 1731 if (!strcasecmp(c->argv[1]->ptr,"no") && 1732 !strcasecmp(c->argv[2]->ptr,"one")) { 1733 if (server.masterhost) { 1734 replicationUnsetMaster(); 1735 sds client = catClientInfoString(sdsempty(),c); 1736 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", 1737 client); 1738 sdsfree(client); 1739 } 1740 } else { 1741 long port; 1742 1743 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) 1744 return; 1745 1746 /* Check if we are already attached to the specified slave */ 1747 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1748 && server.masterport == port) { 1749 serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1750 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1751 return; 1752 } 1753 /* There was no previous master or the user specified a different one, 1754 * we can continue. */ 1755 replicationSetMaster(c->argv[1]->ptr, port); 1756 sds client = catClientInfoString(sdsempty(),c); 1757 serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')", 1758 server.masterhost, server.masterport, client); 1759 sdsfree(client); 1760 } 1761 addReply(c,shared.ok); 1762 } 1763 1764 /* ROLE command: provide information about the role of the instance 1765 * (master or slave) and additional information related to replication 1766 * in an easy to process format. */ 1767 void roleCommand(client *c) { 1768 if (server.masterhost == NULL) { 1769 listIter li; 1770 listNode *ln; 1771 void *mbcount; 1772 int slaves = 0; 1773 1774 addReplyMultiBulkLen(c,3); 1775 addReplyBulkCBuffer(c,"master",6); 1776 addReplyLongLong(c,server.master_repl_offset); 1777 mbcount = addDeferredMultiBulkLength(c); 1778 listRewind(server.slaves,&li); 1779 while((ln = listNext(&li))) { 1780 client *slave = ln->value; 1781 char ip[NET_IP_STR_LEN]; 1782 1783 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1784 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 1785 addReplyMultiBulkLen(c,3); 1786 addReplyBulkCString(c,ip); 1787 addReplyBulkLongLong(c,slave->slave_listening_port); 1788 addReplyBulkLongLong(c,slave->repl_ack_off); 1789 slaves++; 1790 } 1791 setDeferredMultiBulkLength(c,mbcount,slaves); 1792 } else { 1793 char *slavestate = NULL; 1794 1795 addReplyMultiBulkLen(c,5); 1796 addReplyBulkCBuffer(c,"slave",5); 1797 addReplyBulkCString(c,server.masterhost); 1798 addReplyLongLong(c,server.masterport); 1799 if (slaveIsInHandshakeState()) { 1800 slavestate = "handshake"; 1801 } else { 1802 switch(server.repl_state) { 1803 case REPL_STATE_NONE: slavestate = "none"; break; 1804 case REPL_STATE_CONNECT: slavestate = "connect"; break; 1805 case REPL_STATE_CONNECTING: slavestate = "connecting"; break; 1806 case REPL_STATE_TRANSFER: slavestate = "sync"; break; 1807 case REPL_STATE_CONNECTED: slavestate = "connected"; break; 1808 default: slavestate = "unknown"; break; 1809 } 1810 } 1811 addReplyBulkCString(c,slavestate); 1812 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1813 } 1814 } 1815 1816 /* Send a REPLCONF ACK command to the master to inform it about the current 1817 * processed offset. If we are not connected with a master, the command has 1818 * no effects. */ 1819 void replicationSendAck(void) { 1820 client *c = server.master; 1821 1822 if (c != NULL) { 1823 c->flags |= CLIENT_MASTER_FORCE_REPLY; 1824 addReplyMultiBulkLen(c,3); 1825 addReplyBulkCString(c,"REPLCONF"); 1826 addReplyBulkCString(c,"ACK"); 1827 addReplyBulkLongLong(c,c->reploff); 1828 c->flags &= ~CLIENT_MASTER_FORCE_REPLY; 1829 } 1830 } 1831 1832 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1833 1834 /* In order to implement partial synchronization we need to be able to cache 1835 * our master's client structure after a transient disconnection. 1836 * It is cached into server.cached_master and flushed away using the following 1837 * functions. */ 1838 1839 /* This function is called by freeClient() in order to cache the master 1840 * client structure instead of destryoing it. freeClient() will return 1841 * ASAP after this function returns, so every action needed to avoid problems 1842 * with a client that is really "suspended" has to be done by this function. 1843 * 1844 * The other functions that will deal with the cached master are: 1845 * 1846 * replicationDiscardCachedMaster() that will make sure to kill the client 1847 * as for some reason we don't want to use it in the future. 1848 * 1849 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1850 * handshake in order to reactivate the cached master. 1851 */ 1852 void replicationCacheMaster(client *c) { 1853 serverAssert(server.master != NULL && server.cached_master == NULL); 1854 serverLog(LL_NOTICE,"Caching the disconnected master state."); 1855 1856 /* Unlink the client from the server structures. */ 1857 unlinkClient(c); 1858 1859 /* Save the master. Server.master will be set to null later by 1860 * replicationHandleMasterDisconnection(). */ 1861 server.cached_master = server.master; 1862 1863 /* Invalidate the Peer ID cache. */ 1864 if (c->peerid) { 1865 sdsfree(c->peerid); 1866 c->peerid = NULL; 1867 } 1868 1869 /* Caching the master happens instead of the actual freeClient() call, 1870 * so make sure to adjust the replication state. This function will 1871 * also set server.master to NULL. */ 1872 replicationHandleMasterDisconnection(); 1873 } 1874 1875 /* Free a cached master, called when there are no longer the conditions for 1876 * a partial resync on reconnection. */ 1877 void replicationDiscardCachedMaster(void) { 1878 if (server.cached_master == NULL) return; 1879 1880 serverLog(LL_NOTICE,"Discarding previously cached master state."); 1881 server.cached_master->flags &= ~CLIENT_MASTER; 1882 freeClient(server.cached_master); 1883 server.cached_master = NULL; 1884 } 1885 1886 /* Turn the cached master into the current master, using the file descriptor 1887 * passed as argument as the socket for the new master. 1888 * 1889 * This function is called when successfully setup a partial resynchronization 1890 * so the stream of data that we'll receive will start from were this 1891 * master left. */ 1892 void replicationResurrectCachedMaster(int newfd) { 1893 server.master = server.cached_master; 1894 server.cached_master = NULL; 1895 server.master->fd = newfd; 1896 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); 1897 server.master->authenticated = 1; 1898 server.master->lastinteraction = server.unixtime; 1899 server.repl_state = REPL_STATE_CONNECTED; 1900 1901 /* Re-add to the list of clients. */ 1902 listAddNodeTail(server.clients,server.master); 1903 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1904 readQueryFromClient, server.master)) { 1905 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1906 freeClientAsync(server.master); /* Close ASAP. */ 1907 } 1908 1909 /* We may also need to install the write handler as well if there is 1910 * pending data in the write buffers. */ 1911 if (clientHasPendingReplies(server.master)) { 1912 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1913 sendReplyToClient, server.master)) { 1914 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1915 freeClientAsync(server.master); /* Close ASAP. */ 1916 } 1917 } 1918 } 1919 1920 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1921 1922 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1923 * If the option is active, the server will prevent writes if there are not 1924 * enough connected slaves with the specified lag (or less). */ 1925 void refreshGoodSlavesCount(void) { 1926 listIter li; 1927 listNode *ln; 1928 int good = 0; 1929 1930 if (!server.repl_min_slaves_to_write || 1931 !server.repl_min_slaves_max_lag) return; 1932 1933 listRewind(server.slaves,&li); 1934 while((ln = listNext(&li))) { 1935 client *slave = ln->value; 1936 time_t lag = server.unixtime - slave->repl_ack_time; 1937 1938 if (slave->replstate == SLAVE_STATE_ONLINE && 1939 lag <= server.repl_min_slaves_max_lag) good++; 1940 } 1941 server.repl_good_slaves_count = good; 1942 } 1943 1944 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1945 * The goal of this code is to keep track of scripts already sent to every 1946 * connected slave, in order to be able to replicate EVALSHA as it is without 1947 * translating it to EVAL every time it is possible. 1948 * 1949 * We use a capped collection implemented by a hash table for fast lookup 1950 * of scripts we can send as EVALSHA, plus a linked list that is used for 1951 * eviction of the oldest entry when the max number of items is reached. 1952 * 1953 * We don't care about taking a different cache for every different slave 1954 * since to fill the cache again is not very costly, the goal of this code 1955 * is to avoid that the same big script is trasmitted a big number of times 1956 * per second wasting bandwidth and processor speed, but it is not a problem 1957 * if we need to rebuild the cache from scratch from time to time, every used 1958 * script will need to be transmitted a single time to reappear in the cache. 1959 * 1960 * This is how the system works: 1961 * 1962 * 1) Every time a new slave connects, we flush the whole script cache. 1963 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1964 * trying to convert EVAL into EVALSHA specifically for slaves. 1965 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1966 * corresponding SHA1 of the script into the cache as we are sure every 1967 * slave knows about the script starting from now. 1968 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1969 * and at the same time flush the script cache. 1970 * 5) When the last slave disconnects, flush the cache. 1971 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1972 * in the master sometimes. 1973 */ 1974 1975 /* Initialize the script cache, only called at startup. */ 1976 void replicationScriptCacheInit(void) { 1977 server.repl_scriptcache_size = 10000; 1978 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1979 server.repl_scriptcache_fifo = listCreate(); 1980 } 1981 1982 /* Empty the script cache. Should be called every time we are no longer sure 1983 * that every slave knows about all the scripts in our set, or when the 1984 * current AOF "context" is no longer aware of the script. In general we 1985 * should flush the cache: 1986 * 1987 * 1) Every time a new slave reconnects to this master and performs a 1988 * full SYNC (PSYNC does not require flushing). 1989 * 2) Every time an AOF rewrite is performed. 1990 * 3) Every time we are left without slaves at all, and AOF is off, in order 1991 * to reclaim otherwise unused memory. 1992 */ 1993 void replicationScriptCacheFlush(void) { 1994 dictEmpty(server.repl_scriptcache_dict,NULL); 1995 listRelease(server.repl_scriptcache_fifo); 1996 server.repl_scriptcache_fifo = listCreate(); 1997 } 1998 1999 /* Add an entry into the script cache, if we reach max number of entries the 2000 * oldest is removed from the list. */ 2001 void replicationScriptCacheAdd(sds sha1) { 2002 int retval; 2003 sds key = sdsdup(sha1); 2004 2005 /* Evict oldest. */ 2006 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 2007 { 2008 listNode *ln = listLast(server.repl_scriptcache_fifo); 2009 sds oldest = listNodeValue(ln); 2010 2011 retval = dictDelete(server.repl_scriptcache_dict,oldest); 2012 serverAssert(retval == DICT_OK); 2013 listDelNode(server.repl_scriptcache_fifo,ln); 2014 } 2015 2016 /* Add current. */ 2017 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 2018 listAddNodeHead(server.repl_scriptcache_fifo,key); 2019 serverAssert(retval == DICT_OK); 2020 } 2021 2022 /* Returns non-zero if the specified entry exists inside the cache, that is, 2023 * if all the slaves are aware of this script SHA1. */ 2024 int replicationScriptCacheExists(sds sha1) { 2025 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 2026 } 2027 2028 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 2029 * Redis synchronous replication design can be summarized in points: 2030 * 2031 * - Redis masters have a global replication offset, used by PSYNC. 2032 * - Master increment the offset every time new commands are sent to slaves. 2033 * - Slaves ping back masters with the offset processed so far. 2034 * 2035 * So synchronous replication adds a new WAIT command in the form: 2036 * 2037 * WAIT <num_replicas> <milliseconds_timeout> 2038 * 2039 * That returns the number of replicas that processed the query when 2040 * we finally have at least num_replicas, or when the timeout was 2041 * reached. 2042 * 2043 * The command is implemented in this way: 2044 * 2045 * - Every time a client processes a command, we remember the replication 2046 * offset after sending that command to the slaves. 2047 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 2048 * The client is blocked at the same time (see blocked.c). 2049 * - Once we receive enough ACKs for a given offset or when the timeout 2050 * is reached, the WAIT command is unblocked and the reply sent to the 2051 * client. 2052 */ 2053 2054 /* This just set a flag so that we broadcast a REPLCONF GETACK command 2055 * to all the slaves in the beforeSleep() function. Note that this way 2056 * we "group" all the clients that want to wait for synchronouns replication 2057 * in a given event loop iteration, and send a single GETACK for them all. */ 2058 void replicationRequestAckFromSlaves(void) { 2059 server.get_ack_from_slaves = 1; 2060 } 2061 2062 /* Return the number of slaves that already acknowledged the specified 2063 * replication offset. */ 2064 int replicationCountAcksByOffset(long long offset) { 2065 listIter li; 2066 listNode *ln; 2067 int count = 0; 2068 2069 listRewind(server.slaves,&li); 2070 while((ln = listNext(&li))) { 2071 client *slave = ln->value; 2072 2073 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2074 if (slave->repl_ack_off >= offset) count++; 2075 } 2076 return count; 2077 } 2078 2079 /* WAIT for N replicas to acknowledge the processing of our latest 2080 * write command (and all the previous commands). */ 2081 void waitCommand(client *c) { 2082 mstime_t timeout; 2083 long numreplicas, ackreplicas; 2084 long long offset = c->woff; 2085 2086 /* Argument parsing. */ 2087 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) 2088 return; 2089 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 2090 != C_OK) return; 2091 2092 /* First try without blocking at all. */ 2093 ackreplicas = replicationCountAcksByOffset(c->woff); 2094 if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) { 2095 addReplyLongLong(c,ackreplicas); 2096 return; 2097 } 2098 2099 /* Otherwise block the client and put it into our list of clients 2100 * waiting for ack from slaves. */ 2101 c->bpop.timeout = timeout; 2102 c->bpop.reploffset = offset; 2103 c->bpop.numreplicas = numreplicas; 2104 listAddNodeTail(server.clients_waiting_acks,c); 2105 blockClient(c,BLOCKED_WAIT); 2106 2107 /* Make sure that the server will send an ACK request to all the slaves 2108 * before returning to the event loop. */ 2109 replicationRequestAckFromSlaves(); 2110 } 2111 2112 /* This is called by unblockClient() to perform the blocking op type 2113 * specific cleanup. We just remove the client from the list of clients 2114 * waiting for replica acks. Never call it directly, call unblockClient() 2115 * instead. */ 2116 void unblockClientWaitingReplicas(client *c) { 2117 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 2118 serverAssert(ln != NULL); 2119 listDelNode(server.clients_waiting_acks,ln); 2120 } 2121 2122 /* Check if there are clients blocked in WAIT that can be unblocked since 2123 * we received enough ACKs from slaves. */ 2124 void processClientsWaitingReplicas(void) { 2125 long long last_offset = 0; 2126 int last_numreplicas = 0; 2127 2128 listIter li; 2129 listNode *ln; 2130 2131 listRewind(server.clients_waiting_acks,&li); 2132 while((ln = listNext(&li))) { 2133 client *c = ln->value; 2134 2135 /* Every time we find a client that is satisfied for a given 2136 * offset and number of replicas, we remember it so the next client 2137 * may be unblocked without calling replicationCountAcksByOffset() 2138 * if the requested offset / replicas were equal or less. */ 2139 if (last_offset && last_offset > c->bpop.reploffset && 2140 last_numreplicas > c->bpop.numreplicas) 2141 { 2142 unblockClient(c); 2143 addReplyLongLong(c,last_numreplicas); 2144 } else { 2145 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 2146 2147 if (numreplicas >= c->bpop.numreplicas) { 2148 last_offset = c->bpop.reploffset; 2149 last_numreplicas = numreplicas; 2150 unblockClient(c); 2151 addReplyLongLong(c,numreplicas); 2152 } 2153 } 2154 } 2155 } 2156 2157 /* Return the slave replication offset for this instance, that is 2158 * the offset for which we already processed the master replication stream. */ 2159 long long replicationGetSlaveOffset(void) { 2160 long long offset = 0; 2161 2162 if (server.masterhost != NULL) { 2163 if (server.master) { 2164 offset = server.master->reploff; 2165 } else if (server.cached_master) { 2166 offset = server.cached_master->reploff; 2167 } 2168 } 2169 /* offset may be -1 when the master does not support it at all, however 2170 * this function is designed to return an offset that can express the 2171 * amount of data processed by the master, so we return a positive 2172 * integer. */ 2173 if (offset < 0) offset = 0; 2174 return offset; 2175 } 2176 2177 /* --------------------------- REPLICATION CRON ---------------------------- */ 2178 2179 /* Replication cron function, called 1 time per second. */ 2180 void replicationCron(void) { 2181 static long long replication_cron_loops = 0; 2182 2183 /* Non blocking connection timeout? */ 2184 if (server.masterhost && 2185 (server.repl_state == REPL_STATE_CONNECTING || 2186 slaveIsInHandshakeState()) && 2187 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2188 { 2189 serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); 2190 cancelReplicationHandshake(); 2191 } 2192 2193 /* Bulk transfer I/O timeout? */ 2194 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && 2195 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2196 { 2197 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 2198 cancelReplicationHandshake(); 2199 } 2200 2201 /* Timed out master when we are an already connected slave? */ 2202 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && 2203 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 2204 { 2205 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); 2206 freeClient(server.master); 2207 } 2208 2209 /* Check if we should connect to a MASTER */ 2210 if (server.repl_state == REPL_STATE_CONNECT) { 2211 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", 2212 server.masterhost, server.masterport); 2213 if (connectWithMaster() == C_OK) { 2214 serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); 2215 } 2216 } 2217 2218 /* Send ACK to master from time to time. 2219 * Note that we do not send periodic acks to masters that don't 2220 * support PSYNC and replication offsets. */ 2221 if (server.masterhost && server.master && 2222 !(server.master->flags & CLIENT_PRE_PSYNC)) 2223 replicationSendAck(); 2224 2225 /* If we have attached slaves, PING them from time to time. 2226 * So slaves can implement an explicit timeout to masters, and will 2227 * be able to detect a link disconnection even if the TCP connection 2228 * will not actually go down. */ 2229 listIter li; 2230 listNode *ln; 2231 robj *ping_argv[1]; 2232 2233 /* First, send PING according to ping_slave_period. */ 2234 if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { 2235 ping_argv[0] = createStringObject("PING",4); 2236 replicationFeedSlaves(server.slaves, server.slaveseldb, 2237 ping_argv, 1); 2238 decrRefCount(ping_argv[0]); 2239 } 2240 2241 /* Second, send a newline to all the slaves in pre-synchronization 2242 * stage, that is, slaves waiting for the master to create the RDB file. 2243 * The newline will be ignored by the slave but will refresh the 2244 * last-io timer preventing a timeout. In this case we ignore the 2245 * ping period and refresh the connection once per second since certain 2246 * timeouts are set at a few seconds (example: PSYNC response). */ 2247 listRewind(server.slaves,&li); 2248 while((ln = listNext(&li))) { 2249 client *slave = ln->value; 2250 2251 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || 2252 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && 2253 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) 2254 { 2255 if (write(slave->fd, "\n", 1) == -1) { 2256 /* Don't worry, it's just a ping. */ 2257 } 2258 } 2259 } 2260 2261 /* Disconnect timedout slaves. */ 2262 if (listLength(server.slaves)) { 2263 listIter li; 2264 listNode *ln; 2265 2266 listRewind(server.slaves,&li); 2267 while((ln = listNext(&li))) { 2268 client *slave = ln->value; 2269 2270 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2271 if (slave->flags & CLIENT_PRE_PSYNC) continue; 2272 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2273 { 2274 serverLog(LL_WARNING, "Disconnecting timedout slave: %s", 2275 replicationGetSlaveName(slave)); 2276 freeClient(slave); 2277 } 2278 } 2279 } 2280 2281 /* If we have no attached slaves and there is a replication backlog 2282 * using memory, free it after some (configured) time. */ 2283 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2284 server.repl_backlog) 2285 { 2286 time_t idle = server.unixtime - server.repl_no_slaves_since; 2287 2288 if (idle > server.repl_backlog_time_limit) { 2289 freeReplicationBacklog(); 2290 serverLog(LL_NOTICE, 2291 "Replication backlog freed after %d seconds " 2292 "without connected slaves.", 2293 (int) server.repl_backlog_time_limit); 2294 } 2295 } 2296 2297 /* If AOF is disabled and we no longer have attached slaves, we can 2298 * free our Replication Script Cache as there is no need to propagate 2299 * EVALSHA at all. */ 2300 if (listLength(server.slaves) == 0 && 2301 server.aof_state == AOF_OFF && 2302 listLength(server.repl_scriptcache_fifo) != 0) 2303 { 2304 replicationScriptCacheFlush(); 2305 } 2306 2307 /* If we are using diskless replication and there are slaves waiting 2308 * in WAIT_BGSAVE_START state, check if enough seconds elapsed and 2309 * start a BGSAVE. 2310 * 2311 * This code is also useful to trigger a BGSAVE if the diskless 2312 * replication was turned off with CONFIG SET, while there were already 2313 * slaves in WAIT_BGSAVE_START state. */ 2314 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2315 time_t idle, max_idle = 0; 2316 int slaves_waiting = 0; 2317 int mincapa = -1; 2318 listNode *ln; 2319 listIter li; 2320 2321 listRewind(server.slaves,&li); 2322 while((ln = listNext(&li))) { 2323 client *slave = ln->value; 2324 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 2325 idle = server.unixtime - slave->lastinteraction; 2326 if (idle > max_idle) max_idle = idle; 2327 slaves_waiting++; 2328 mincapa = (mincapa == -1) ? slave->slave_capa : 2329 (mincapa & slave->slave_capa); 2330 } 2331 } 2332 2333 if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { 2334 /* Start a BGSAVE. Usually with socket target, or with disk target 2335 * if there was a recent socket -> disk config change. */ 2336 startBgsaveForReplication(mincapa); 2337 } 2338 } 2339 2340 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2341 refreshGoodSlavesCount(); 2342 replication_cron_loops++; /* Incremented with frequency 1 HZ. */ 2343 } 2344