1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "redis.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 44 /* ---------------------------------- MASTER -------------------------------- */ 45 46 void createReplicationBacklog(void) { 47 redisAssert(server.repl_backlog == NULL); 48 server.repl_backlog = zmalloc(server.repl_backlog_size); 49 server.repl_backlog_histlen = 0; 50 server.repl_backlog_idx = 0; 51 /* When a new backlog buffer is created, we increment the replication 52 * offset by one to make sure we'll not be able to PSYNC with any 53 * previous slave. This is needed because we avoid incrementing the 54 * master_repl_offset if no backlog exists nor slaves are attached. */ 55 server.master_repl_offset++; 56 57 /* We don't have any data inside our buffer, but virtually the first 58 * byte we have is the next byte that will be generated for the 59 * replication stream. */ 60 server.repl_backlog_off = server.master_repl_offset+1; 61 } 62 63 /* This function is called when the user modifies the replication backlog 64 * size at runtime. It is up to the function to both update the 65 * server.repl_backlog_size and to resize the buffer and setup it so that 66 * it contains the same data as the previous one (possibly less data, but 67 * the most recent bytes, or the same data and more free space in case the 68 * buffer is enlarged). */ 69 void resizeReplicationBacklog(long long newsize) { 70 if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE) 71 newsize = REDIS_REPL_BACKLOG_MIN_SIZE; 72 if (server.repl_backlog_size == newsize) return; 73 74 server.repl_backlog_size = newsize; 75 if (server.repl_backlog != NULL) { 76 /* What we actually do is to flush the old buffer and realloc a new 77 * empty one. It will refill with new data incrementally. 78 * The reason is that copying a few gigabytes adds latency and even 79 * worse often we need to alloc additional space before freeing the 80 * old buffer. */ 81 zfree(server.repl_backlog); 82 server.repl_backlog = zmalloc(server.repl_backlog_size); 83 server.repl_backlog_histlen = 0; 84 server.repl_backlog_idx = 0; 85 /* Next byte we have is... the next since the buffer is emtpy. */ 86 server.repl_backlog_off = server.master_repl_offset+1; 87 } 88 } 89 90 void freeReplicationBacklog(void) { 91 redisAssert(listLength(server.slaves) == 0); 92 zfree(server.repl_backlog); 93 server.repl_backlog = NULL; 94 } 95 96 /* Add data to the replication backlog. 97 * This function also increments the global replication offset stored at 98 * server.master_repl_offset, because there is no case where we want to feed 99 * the backlog without incrementing the buffer. */ 100 void feedReplicationBacklog(void *ptr, size_t len) { 101 unsigned char *p = ptr; 102 103 server.master_repl_offset += len; 104 105 /* This is a circular buffer, so write as much data we can at every 106 * iteration and rewind the "idx" index if we reach the limit. */ 107 while(len) { 108 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 109 if (thislen > len) thislen = len; 110 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 111 server.repl_backlog_idx += thislen; 112 if (server.repl_backlog_idx == server.repl_backlog_size) 113 server.repl_backlog_idx = 0; 114 len -= thislen; 115 p += thislen; 116 server.repl_backlog_histlen += thislen; 117 } 118 if (server.repl_backlog_histlen > server.repl_backlog_size) 119 server.repl_backlog_histlen = server.repl_backlog_size; 120 /* Set the offset of the first byte we have in the backlog. */ 121 server.repl_backlog_off = server.master_repl_offset - 122 server.repl_backlog_histlen + 1; 123 } 124 125 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 126 * as input. */ 127 void feedReplicationBacklogWithObject(robj *o) { 128 char llstr[REDIS_LONGSTR_SIZE]; 129 void *p; 130 size_t len; 131 132 if (o->encoding == REDIS_ENCODING_INT) { 133 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 134 p = llstr; 135 } else { 136 len = sdslen(o->ptr); 137 p = o->ptr; 138 } 139 feedReplicationBacklog(p,len); 140 } 141 142 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 143 listNode *ln; 144 listIter li; 145 int j, len; 146 char llstr[REDIS_LONGSTR_SIZE]; 147 148 /* If there aren't slaves, and there is no backlog buffer to populate, 149 * we can return ASAP. */ 150 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 151 152 /* We can't have slaves attached and no backlog. */ 153 redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 154 155 /* Send SELECT command to every slave if needed. */ 156 if (server.slaveseldb != dictid) { 157 robj *selectcmd; 158 159 /* For a few DBs we have pre-computed SELECT command. */ 160 if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { 161 selectcmd = shared.select[dictid]; 162 } else { 163 int dictid_len; 164 165 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 166 selectcmd = createObject(REDIS_STRING, 167 sdscatprintf(sdsempty(), 168 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 169 dictid_len, llstr)); 170 } 171 172 /* Add the SELECT command into the backlog. */ 173 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 174 175 /* Send it to slaves. */ 176 listRewind(slaves,&li); 177 while((ln = listNext(&li))) { 178 redisClient *slave = ln->value; 179 addReply(slave,selectcmd); 180 } 181 182 if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) 183 decrRefCount(selectcmd); 184 } 185 server.slaveseldb = dictid; 186 187 /* Write the command to the replication backlog if any. */ 188 if (server.repl_backlog) { 189 char aux[REDIS_LONGSTR_SIZE+3]; 190 191 /* Add the multi bulk reply length. */ 192 aux[0] = '*'; 193 len = ll2string(aux+1,sizeof(aux)-1,argc); 194 aux[len+1] = '\r'; 195 aux[len+2] = '\n'; 196 feedReplicationBacklog(aux,len+3); 197 198 for (j = 0; j < argc; j++) { 199 long objlen = stringObjectLen(argv[j]); 200 201 /* We need to feed the buffer with the object as a bulk reply 202 * not just as a plain string, so create the $..CRLF payload len 203 * ad add the final CRLF */ 204 aux[0] = '$'; 205 len = ll2string(aux+1,sizeof(aux)-1,objlen); 206 aux[len+1] = '\r'; 207 aux[len+2] = '\n'; 208 feedReplicationBacklog(aux,len+3); 209 feedReplicationBacklogWithObject(argv[j]); 210 feedReplicationBacklog(aux+len+1,2); 211 } 212 } 213 214 /* Write the command to every slave. */ 215 listRewind(slaves,&li); 216 while((ln = listNext(&li))) { 217 redisClient *slave = ln->value; 218 219 /* Don't feed slaves that are still waiting for BGSAVE to start */ 220 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; 221 222 /* Feed slaves that are waiting for the initial SYNC (so these commands 223 * are queued in the output buffer until the initial SYNC completes), 224 * or are already in sync with the master. */ 225 226 /* Add the multi bulk length. */ 227 addReplyMultiBulkLen(slave,argc); 228 229 /* Finally any additional argument that was not stored inside the 230 * static buffer if any (from j to argc). */ 231 for (j = 0; j < argc; j++) 232 addReplyBulk(slave,argv[j]); 233 } 234 } 235 236 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { 237 listNode *ln; 238 listIter li; 239 int j; 240 sds cmdrepr = sdsnew("+"); 241 robj *cmdobj; 242 struct timeval tv; 243 244 gettimeofday(&tv,NULL); 245 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 246 if (c->flags & REDIS_LUA_CLIENT) { 247 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 248 } else if (c->flags & REDIS_UNIX_SOCKET) { 249 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 250 } else { 251 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 252 } 253 254 for (j = 0; j < argc; j++) { 255 if (argv[j]->encoding == REDIS_ENCODING_INT) { 256 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 257 } else { 258 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 259 sdslen(argv[j]->ptr)); 260 } 261 if (j != argc-1) 262 cmdrepr = sdscatlen(cmdrepr," ",1); 263 } 264 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 265 cmdobj = createObject(REDIS_STRING,cmdrepr); 266 267 listRewind(monitors,&li); 268 while((ln = listNext(&li))) { 269 redisClient *monitor = ln->value; 270 addReply(monitor,cmdobj); 271 } 272 decrRefCount(cmdobj); 273 } 274 275 /* Feed the slave 'c' with the replication backlog starting from the 276 * specified 'offset' up to the end of the backlog. */ 277 long long addReplyReplicationBacklog(redisClient *c, long long offset) { 278 long long j, skip, len; 279 280 redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 281 282 if (server.repl_backlog_histlen == 0) { 283 redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero"); 284 return 0; 285 } 286 287 redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld", 288 server.repl_backlog_size); 289 redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld", 290 server.repl_backlog_off); 291 redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld", 292 server.repl_backlog_histlen); 293 redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld", 294 server.repl_backlog_idx); 295 296 /* Compute the amount of bytes we need to discard. */ 297 skip = offset - server.repl_backlog_off; 298 redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip); 299 300 /* Point j to the oldest byte, that is actaully our 301 * server.repl_backlog_off byte. */ 302 j = (server.repl_backlog_idx + 303 (server.repl_backlog_size-server.repl_backlog_histlen)) % 304 server.repl_backlog_size; 305 redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j); 306 307 /* Discard the amount of data to seek to the specified 'offset'. */ 308 j = (j + skip) % server.repl_backlog_size; 309 310 /* Feed slave with data. Since it is a circular buffer we have to 311 * split the reply in two parts if we are cross-boundary. */ 312 len = server.repl_backlog_histlen - skip; 313 redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len); 314 while(len) { 315 long long thislen = 316 ((server.repl_backlog_size - j) < len) ? 317 (server.repl_backlog_size - j) : len; 318 319 redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 320 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 321 len -= thislen; 322 j = 0; 323 } 324 return server.repl_backlog_histlen - skip; 325 } 326 327 /* This function handles the PSYNC command from the point of view of a 328 * master receiving a request for partial resynchronization. 329 * 330 * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed 331 * with the usual full resync. */ 332 int masterTryPartialResynchronization(redisClient *c) { 333 long long psync_offset, psync_len; 334 char *master_runid = c->argv[1]->ptr; 335 char buf[128]; 336 int buflen; 337 338 /* Is the runid of this master the same advertised by the wannabe slave 339 * via PSYNC? If runid changed this master is a different instance and 340 * there is no way to continue. */ 341 if (strcasecmp(master_runid, server.runid)) { 342 /* Run id "?" is used by slaves that want to force a full resync. */ 343 if (master_runid[0] != '?') { 344 redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " 345 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 346 master_runid, server.runid); 347 } else { 348 redisLog(REDIS_NOTICE,"Full resync requested by slave."); 349 } 350 goto need_full_resync; 351 } 352 353 /* We still have the data our slave is asking for? */ 354 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 355 REDIS_OK) goto need_full_resync; 356 if (!server.repl_backlog || 357 psync_offset < server.repl_backlog_off || 358 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 359 { 360 redisLog(REDIS_NOTICE, 361 "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); 362 if (psync_offset > server.master_repl_offset) { 363 redisLog(REDIS_WARNING, 364 "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); 365 } 366 goto need_full_resync; 367 } 368 369 /* If we reached this point, we are able to perform a partial resync: 370 * 1) Set client state to make it a slave. 371 * 2) Inform the client we can continue with +CONTINUE 372 * 3) Send the backlog data (from the offset to the end) to the slave. */ 373 c->flags |= REDIS_SLAVE; 374 c->replstate = REDIS_REPL_ONLINE; 375 c->repl_ack_time = server.unixtime; 376 listAddNodeTail(server.slaves,c); 377 /* We can't use the connection buffers since they are used to accumulate 378 * new commands at this stage. But we are sure the socket send buffer is 379 * emtpy so this write will never fail actually. */ 380 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 381 if (write(c->fd,buf,buflen) != buflen) { 382 freeClientAsync(c); 383 return REDIS_OK; 384 } 385 psync_len = addReplyReplicationBacklog(c,psync_offset); 386 redisLog(REDIS_NOTICE, 387 "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); 388 /* Note that we don't need to set the selected DB at server.slaveseldb 389 * to -1 to force the master to emit SELECT, since the slave already 390 * has this state from the previous connection with the master. */ 391 392 refreshGoodSlavesCount(); 393 return REDIS_OK; /* The caller can return, no full resync needed. */ 394 395 need_full_resync: 396 /* We need a full resync for some reason... notify the client. */ 397 psync_offset = server.master_repl_offset; 398 /* Add 1 to psync_offset if it the replication backlog does not exists 399 * as when it will be created later we'll increment the offset by one. */ 400 if (server.repl_backlog == NULL) psync_offset++; 401 /* Again, we can't use the connection buffers (see above). */ 402 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 403 server.runid,psync_offset); 404 if (write(c->fd,buf,buflen) != buflen) { 405 freeClientAsync(c); 406 return REDIS_OK; 407 } 408 return REDIS_ERR; 409 } 410 411 /* SYNC ad PSYNC command implemenation. */ 412 void syncCommand(redisClient *c) { 413 /* ignore SYNC if already slave or in monitor mode */ 414 if (c->flags & REDIS_SLAVE) return; 415 416 /* Refuse SYNC requests if we are a slave but the link with our master 417 * is not ok... */ 418 if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { 419 addReplyError(c,"Can't SYNC while not connected with my master"); 420 return; 421 } 422 423 /* SYNC can't be issued when the server has pending data to send to 424 * the client about already issued commands. We need a fresh reply 425 * buffer registering the differences between the BGSAVE and the current 426 * dataset, so that we can copy to other slaves if needed. */ 427 if (listLength(c->reply) != 0 || c->bufpos != 0) { 428 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 429 return; 430 } 431 432 redisLog(REDIS_NOTICE,"Slave asks for synchronization"); 433 434 /* Try a partial resynchronization if this is a PSYNC command. 435 * If it fails, we continue with usual full resynchronization, however 436 * when this happens masterTryPartialResynchronization() already 437 * replied with: 438 * 439 * +FULLRESYNC <runid> <offset> 440 * 441 * So the slave knows the new runid and offset to try a PSYNC later 442 * if the connection with the master is lost. */ 443 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 444 if (masterTryPartialResynchronization(c) == REDIS_OK) { 445 server.stat_sync_partial_ok++; 446 return; /* No full resync needed, return. */ 447 } else { 448 char *master_runid = c->argv[1]->ptr; 449 450 /* Increment stats for failed PSYNCs, but only if the 451 * runid is not "?", as this is used by slaves to force a full 452 * resync on purpose when they are not albe to partially 453 * resync. */ 454 if (master_runid[0] != '?') server.stat_sync_partial_err++; 455 } 456 } else { 457 /* If a slave uses SYNC, we are dealing with an old implementation 458 * of the replication protocol (like redis-cli --slave). Flag the client 459 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 460 c->flags |= REDIS_PRE_PSYNC; 461 } 462 463 /* Full resynchronization. */ 464 server.stat_sync_full++; 465 466 /* Here we need to check if there is a background saving operation 467 * in progress, or if it is required to start one */ 468 if (server.rdb_child_pid != -1) { 469 /* Ok a background save is in progress. Let's check if it is a good 470 * one for replication, i.e. if there is another slave that is 471 * registering differences since the server forked to save */ 472 redisClient *slave; 473 listNode *ln; 474 listIter li; 475 476 listRewind(server.slaves,&li); 477 while((ln = listNext(&li))) { 478 slave = ln->value; 479 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; 480 } 481 if (ln) { 482 /* Perfect, the server is already registering differences for 483 * another slave. Set the right state, and copy the buffer. */ 484 copyClientOutputBuffer(c,slave); 485 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 486 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); 487 } else { 488 /* No way, we need to wait for the next BGSAVE in order to 489 * register differences */ 490 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 491 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 492 } 493 } else { 494 /* Ok we don't have a BGSAVE in progress, let's start one */ 495 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); 496 if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { 497 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); 498 addReplyError(c,"Unable to perform background save"); 499 return; 500 } 501 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 502 /* Flush the script cache for the new slave. */ 503 replicationScriptCacheFlush(); 504 } 505 506 if (server.repl_disable_tcp_nodelay) 507 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 508 c->repldbfd = -1; 509 c->flags |= REDIS_SLAVE; 510 server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ 511 listAddNodeTail(server.slaves,c); 512 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 513 createReplicationBacklog(); 514 return; 515 } 516 517 /* REPLCONF <option> <value> <option> <value> ... 518 * This command is used by a slave in order to configure the replication 519 * process before starting it with the SYNC command. 520 * 521 * Currently the only use of this command is to communicate to the master 522 * what is the listening port of the Slave redis instance, so that the 523 * master can accurately list slaves and their listening ports in 524 * the INFO output. 525 * 526 * In the future the same command can be used in order to configure 527 * the replication to initiate an incremental replication instead of a 528 * full resync. */ 529 void replconfCommand(redisClient *c) { 530 int j; 531 532 if ((c->argc % 2) == 0) { 533 /* Number of arguments must be odd to make sure that every 534 * option has a corresponding value. */ 535 addReply(c,shared.syntaxerr); 536 return; 537 } 538 539 /* Process every option-value pair. */ 540 for (j = 1; j < c->argc; j+=2) { 541 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 542 long port; 543 544 if ((getLongFromObjectOrReply(c,c->argv[j+1], 545 &port,NULL) != REDIS_OK)) 546 return; 547 c->slave_listening_port = port; 548 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 549 /* REPLCONF ACK is used by slave to inform the master the amount 550 * of replication stream that it processed so far. It is an 551 * internal only command that normal clients should never use. */ 552 long long offset; 553 554 if (!(c->flags & REDIS_SLAVE)) return; 555 if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK)) 556 return; 557 if (offset > c->repl_ack_off) 558 c->repl_ack_off = offset; 559 c->repl_ack_time = server.unixtime; 560 /* Note: this command does not reply anything! */ 561 return; 562 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 563 /* REPLCONF GETACK is used in order to request an ACK ASAP 564 * to the slave. */ 565 if (server.masterhost && server.master) replicationSendAck(); 566 /* Note: this command does not reply anything! */ 567 } else { 568 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 569 (char*)c->argv[j]->ptr); 570 return; 571 } 572 } 573 addReply(c,shared.ok); 574 } 575 576 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 577 redisClient *slave = privdata; 578 REDIS_NOTUSED(el); 579 REDIS_NOTUSED(mask); 580 char buf[REDIS_IOBUF_LEN]; 581 ssize_t nwritten, buflen; 582 583 /* Before sending the RDB file, we send the preamble as configured by the 584 * replication process. Currently the preamble is just the bulk count of 585 * the file in the form "$<length>\r\n". */ 586 if (slave->replpreamble) { 587 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 588 if (nwritten == -1) { 589 redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", 590 strerror(errno)); 591 freeClient(slave); 592 return; 593 } 594 sdsrange(slave->replpreamble,nwritten,-1); 595 if (sdslen(slave->replpreamble) == 0) { 596 sdsfree(slave->replpreamble); 597 slave->replpreamble = NULL; 598 /* fall through sending data. */ 599 } else { 600 return; 601 } 602 } 603 604 /* If the preamble was already transfered, send the RDB bulk data. */ 605 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 606 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 607 if (buflen <= 0) { 608 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", 609 (buflen == 0) ? "premature EOF" : strerror(errno)); 610 freeClient(slave); 611 return; 612 } 613 if ((nwritten = write(fd,buf,buflen)) == -1) { 614 if (errno != EAGAIN) { 615 redisLog(REDIS_WARNING,"Write error sending DB to slave: %s", 616 strerror(errno)); 617 freeClient(slave); 618 } 619 return; 620 } 621 slave->repldboff += nwritten; 622 if (slave->repldboff == slave->repldbsize) { 623 close(slave->repldbfd); 624 slave->repldbfd = -1; 625 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 626 slave->replstate = REDIS_REPL_ONLINE; 627 slave->repl_ack_time = server.unixtime; 628 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 629 sendReplyToClient, slave) == AE_ERR) { 630 redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 631 freeClient(slave); 632 return; 633 } 634 refreshGoodSlavesCount(); 635 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); 636 } 637 } 638 639 /* This function is called at the end of every background saving. 640 * The argument bgsaveerr is REDIS_OK if the background saving succeeded 641 * otherwise REDIS_ERR is passed to the function. 642 * 643 * The goal of this function is to handle slaves waiting for a successful 644 * background saving in order to perform non-blocking synchronization. */ 645 void updateSlavesWaitingBgsave(int bgsaveerr) { 646 listNode *ln; 647 int startbgsave = 0; 648 listIter li; 649 650 listRewind(server.slaves,&li); 651 while((ln = listNext(&li))) { 652 redisClient *slave = ln->value; 653 654 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 655 startbgsave = 1; 656 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 657 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { 658 struct redis_stat buf; 659 660 if (bgsaveerr != REDIS_OK) { 661 freeClient(slave); 662 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); 663 continue; 664 } 665 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 666 redis_fstat(slave->repldbfd,&buf) == -1) { 667 freeClient(slave); 668 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 669 continue; 670 } 671 slave->repldboff = 0; 672 slave->repldbsize = buf.st_size; 673 slave->replstate = REDIS_REPL_SEND_BULK; 674 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 675 (unsigned long long) slave->repldbsize); 676 677 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 678 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 679 freeClient(slave); 680 continue; 681 } 682 } 683 } 684 if (startbgsave) { 685 /* Since we are starting a new background save for one or more slaves, 686 * we flush the Replication Script Cache to use EVAL to propagate every 687 * new EVALSHA for the first time, since all the new slaves don't know 688 * about previous scripts. */ 689 replicationScriptCacheFlush(); 690 if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { 691 listIter li; 692 693 listRewind(server.slaves,&li); 694 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); 695 while((ln = listNext(&li))) { 696 redisClient *slave = ln->value; 697 698 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 699 freeClient(slave); 700 } 701 } 702 } 703 } 704 705 /* ----------------------------------- SLAVE -------------------------------- */ 706 707 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 708 void replicationAbortSyncTransfer(void) { 709 redisAssert(server.repl_state == REDIS_REPL_TRANSFER); 710 711 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 712 close(server.repl_transfer_s); 713 close(server.repl_transfer_fd); 714 unlink(server.repl_transfer_tmpfile); 715 zfree(server.repl_transfer_tmpfile); 716 server.repl_state = REDIS_REPL_CONNECT; 717 } 718 719 /* Avoid the master to detect the slave is timing out while loading the 720 * RDB file in initial synchronization. We send a single newline character 721 * that is valid protocol but is guaranteed to either be sent entierly or 722 * not, since the byte is indivisible. 723 * 724 * The function is called in two contexts: while we flush the current 725 * data with emptyDb(), and while we load the new data received as an 726 * RDB file from the master. */ 727 void replicationSendNewlineToMaster(void) { 728 static time_t newline_sent; 729 if (time(NULL) != newline_sent) { 730 newline_sent = time(NULL); 731 if (write(server.repl_transfer_s,"\n",1) == -1) { 732 /* Pinging back in this stage is best-effort. */ 733 } 734 } 735 } 736 737 /* Callback used by emptyDb() while flushing away old data to load 738 * the new dataset received by the master. */ 739 void replicationEmptyDbCallback(void *privdata) { 740 REDIS_NOTUSED(privdata); 741 replicationSendNewlineToMaster(); 742 } 743 744 /* Asynchronously read the SYNC payload we receive from a master */ 745 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 746 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 747 char buf[4096]; 748 ssize_t nread, readlen; 749 off_t left; 750 REDIS_NOTUSED(el); 751 REDIS_NOTUSED(privdata); 752 REDIS_NOTUSED(mask); 753 754 /* If repl_transfer_size == -1 we still have to read the bulk length 755 * from the master reply. */ 756 if (server.repl_transfer_size == -1) { 757 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 758 redisLog(REDIS_WARNING, 759 "I/O error reading bulk count from MASTER: %s", 760 strerror(errno)); 761 goto error; 762 } 763 764 if (buf[0] == '-') { 765 redisLog(REDIS_WARNING, 766 "MASTER aborted replication with an error: %s", 767 buf+1); 768 goto error; 769 } else if (buf[0] == '\0') { 770 /* At this stage just a newline works as a PING in order to take 771 * the connection live. So we refresh our last interaction 772 * timestamp. */ 773 server.repl_transfer_lastio = server.unixtime; 774 return; 775 } else if (buf[0] != '$') { 776 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 777 goto error; 778 } 779 server.repl_transfer_size = strtol(buf+1,NULL,10); 780 redisLog(REDIS_NOTICE, 781 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 782 (long long) server.repl_transfer_size); 783 return; 784 } 785 786 /* Read bulk data */ 787 left = server.repl_transfer_size - server.repl_transfer_read; 788 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 789 nread = read(fd,buf,readlen); 790 if (nread <= 0) { 791 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", 792 (nread == -1) ? strerror(errno) : "connection lost"); 793 replicationAbortSyncTransfer(); 794 return; 795 } 796 server.repl_transfer_lastio = server.unixtime; 797 if (write(server.repl_transfer_fd,buf,nread) != nread) { 798 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 799 goto error; 800 } 801 server.repl_transfer_read += nread; 802 803 /* Sync data on disk from time to time, otherwise at the end of the transfer 804 * we may suffer a big delay as the memory buffers are copied into the 805 * actual disk. */ 806 if (server.repl_transfer_read >= 807 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 808 { 809 off_t sync_size = server.repl_transfer_read - 810 server.repl_transfer_last_fsync_off; 811 rdb_fsync_range(server.repl_transfer_fd, 812 server.repl_transfer_last_fsync_off, sync_size); 813 server.repl_transfer_last_fsync_off += sync_size; 814 } 815 816 /* Check if the transfer is now complete */ 817 if (server.repl_transfer_read == server.repl_transfer_size) { 818 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 819 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 820 replicationAbortSyncTransfer(); 821 return; 822 } 823 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 824 signalFlushedDb(-1); 825 emptyDb(replicationEmptyDbCallback); 826 /* Before loading the DB into memory we need to delete the readable 827 * handler, otherwise it will get called recursively since 828 * rdbLoad() will call the event loop to process events from time to 829 * time for non blocking loading. */ 830 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 831 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 832 if (rdbLoad(server.rdb_filename) != REDIS_OK) { 833 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 834 replicationAbortSyncTransfer(); 835 return; 836 } 837 /* Final setup of the connected slave <- master link */ 838 zfree(server.repl_transfer_tmpfile); 839 close(server.repl_transfer_fd); 840 server.master = createClient(server.repl_transfer_s); 841 server.master->flags |= REDIS_MASTER; 842 server.master->authenticated = 1; 843 server.repl_state = REDIS_REPL_CONNECTED; 844 server.master->reploff = server.repl_master_initial_offset; 845 memcpy(server.master->replrunid, server.repl_master_runid, 846 sizeof(server.repl_master_runid)); 847 /* If master offset is set to -1, this master is old and is not 848 * PSYNC capable, so we flag it accordingly. */ 849 if (server.master->reploff == -1) 850 server.master->flags |= REDIS_PRE_PSYNC; 851 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 852 /* Restart the AOF subsystem now that we finished the sync. This 853 * will trigger an AOF rewrite, and when done will start appending 854 * to the new file. */ 855 if (server.aof_state != REDIS_AOF_OFF) { 856 int retry = 10; 857 858 stopAppendOnly(); 859 while (retry-- && startAppendOnly() == REDIS_ERR) { 860 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 861 sleep(1); 862 } 863 if (!retry) { 864 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 865 exit(1); 866 } 867 } 868 } 869 870 return; 871 872 error: 873 replicationAbortSyncTransfer(); 874 return; 875 } 876 877 /* Send a synchronous command to the master. Used to send AUTH and 878 * REPLCONF commands before starting the replication with SYNC. 879 * 880 * The command returns an sds string representing the result of the 881 * operation. On error the first byte is a "-". 882 */ 883 char *sendSynchronousCommand(int fd, ...) { 884 va_list ap; 885 sds cmd = sdsempty(); 886 char *arg, buf[256]; 887 888 /* Create the command to send to the master, we use simple inline 889 * protocol for simplicity as currently we only send simple strings. */ 890 va_start(ap,fd); 891 while(1) { 892 arg = va_arg(ap, char*); 893 if (arg == NULL) break; 894 895 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 896 cmd = sdscat(cmd,arg); 897 } 898 cmd = sdscatlen(cmd,"\r\n",2); 899 900 /* Transfer command to the server. */ 901 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { 902 sdsfree(cmd); 903 return sdscatprintf(sdsempty(),"-Writing to master: %s", 904 strerror(errno)); 905 } 906 sdsfree(cmd); 907 908 /* Read the reply from the server. */ 909 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) 910 { 911 return sdscatprintf(sdsempty(),"-Reading from master: %s", 912 strerror(errno)); 913 } 914 return sdsnew(buf); 915 } 916 917 /* Try a partial resynchronization with the master if we are about to reconnect. 918 * If there is no cached master structure, at least try to issue a 919 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 920 * command in order to obtain the master run id and the master replication 921 * global offset. 922 * 923 * This function is designed to be called from syncWithMaster(), so the 924 * following assumptions are made: 925 * 926 * 1) We pass the function an already connected socket "fd". 927 * 2) This function does not close the file descriptor "fd". However in case 928 * of successful partial resynchronization, the function will reuse 929 * 'fd' as file descriptor of the server.master client structure. 930 * 931 * The function returns: 932 * 933 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 934 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 935 * In this case the master run_id and global replication 936 * offset is saved. 937 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 938 * the caller should fall back to SYNC. 939 */ 940 941 #define PSYNC_CONTINUE 0 942 #define PSYNC_FULLRESYNC 1 943 #define PSYNC_NOT_SUPPORTED 2 944 int slaveTryPartialResynchronization(int fd) { 945 char *psync_runid; 946 char psync_offset[32]; 947 sds reply; 948 949 /* Initially set repl_master_initial_offset to -1 to mark the current 950 * master run_id and offset as not valid. Later if we'll be able to do 951 * a FULL resync using the PSYNC command we'll set the offset at the 952 * right value, so that this information will be propagated to the 953 * client structure representing the master into server.master. */ 954 server.repl_master_initial_offset = -1; 955 956 if (server.cached_master) { 957 psync_runid = server.cached_master->replrunid; 958 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 959 redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 960 } else { 961 redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); 962 psync_runid = "?"; 963 memcpy(psync_offset,"-1",3); 964 } 965 966 /* Issue the PSYNC command */ 967 reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); 968 969 if (!strncmp(reply,"+FULLRESYNC",11)) { 970 char *runid = NULL, *offset = NULL; 971 972 /* FULL RESYNC, parse the reply in order to extract the run id 973 * and the replication offset. */ 974 runid = strchr(reply,' '); 975 if (runid) { 976 runid++; 977 offset = strchr(runid,' '); 978 if (offset) offset++; 979 } 980 if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { 981 redisLog(REDIS_WARNING, 982 "Master replied with wrong +FULLRESYNC syntax."); 983 /* This is an unexpected condition, actually the +FULLRESYNC 984 * reply means that the master supports PSYNC, but the reply 985 * format seems wrong. To stay safe we blank the master 986 * runid to make sure next PSYNCs will fail. */ 987 memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); 988 } else { 989 memcpy(server.repl_master_runid, runid, offset-runid-1); 990 server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; 991 server.repl_master_initial_offset = strtoll(offset,NULL,10); 992 redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", 993 server.repl_master_runid, 994 server.repl_master_initial_offset); 995 } 996 /* We are going to full resync, discard the cached master structure. */ 997 replicationDiscardCachedMaster(); 998 sdsfree(reply); 999 return PSYNC_FULLRESYNC; 1000 } 1001 1002 if (!strncmp(reply,"+CONTINUE",9)) { 1003 /* Partial resync was accepted, set the replication state accordingly */ 1004 redisLog(REDIS_NOTICE, 1005 "Successful partial resynchronization with master."); 1006 sdsfree(reply); 1007 replicationResurrectCachedMaster(fd); 1008 return PSYNC_CONTINUE; 1009 } 1010 1011 /* If we reach this point we receied either an error since the master does 1012 * not understand PSYNC, or an unexpected reply from the master. 1013 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1014 1015 if (strncmp(reply,"-ERR",4)) { 1016 /* If it's not an error, log the unexpected event. */ 1017 redisLog(REDIS_WARNING, 1018 "Unexpected reply to PSYNC from master: %s", reply); 1019 } else { 1020 redisLog(REDIS_NOTICE, 1021 "Master does not support PSYNC or is in " 1022 "error state (reply: %s)", reply); 1023 } 1024 sdsfree(reply); 1025 replicationDiscardCachedMaster(); 1026 return PSYNC_NOT_SUPPORTED; 1027 } 1028 1029 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1030 char tmpfile[256], *err; 1031 int dfd, maxtries = 5; 1032 int sockerr = 0, psync_result; 1033 socklen_t errlen = sizeof(sockerr); 1034 REDIS_NOTUSED(el); 1035 REDIS_NOTUSED(privdata); 1036 REDIS_NOTUSED(mask); 1037 1038 /* If this event fired after the user turned the instance into a master 1039 * with SLAVEOF NO ONE we must just return ASAP. */ 1040 if (server.repl_state == REDIS_REPL_NONE) { 1041 close(fd); 1042 return; 1043 } 1044 1045 /* Check for errors in the socket. */ 1046 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1047 sockerr = errno; 1048 if (sockerr) { 1049 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1050 redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", 1051 strerror(sockerr)); 1052 goto error; 1053 } 1054 1055 /* If we were connecting, it's time to send a non blocking PING, we want to 1056 * make sure the master is able to reply before going into the actual 1057 * replication process where we have long timeouts in the order of 1058 * seconds (in the meantime the slave would block). */ 1059 if (server.repl_state == REDIS_REPL_CONNECTING) { 1060 redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); 1061 /* Delete the writable event so that the readable event remains 1062 * registered and we can wait for the PONG reply. */ 1063 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1064 server.repl_state = REDIS_REPL_RECEIVE_PONG; 1065 /* Send the PING, don't check for errors at all, we have the timeout 1066 * that will take care about this. */ 1067 syncWrite(fd,"PING\r\n",6,100); 1068 return; 1069 } 1070 1071 /* Receive the PONG command. */ 1072 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { 1073 char buf[1024]; 1074 1075 /* Delete the readable event, we no longer need it now that there is 1076 * the PING reply to read. */ 1077 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1078 1079 /* Read the reply with explicit timeout. */ 1080 buf[0] = '\0'; 1081 if (syncReadLine(fd,buf,sizeof(buf), 1082 server.repl_syncio_timeout*1000) == -1) 1083 { 1084 redisLog(REDIS_WARNING, 1085 "I/O error reading PING reply from master: %s", 1086 strerror(errno)); 1087 goto error; 1088 } 1089 1090 /* We accept only two replies as valid, a positive +PONG reply 1091 * (we just check for "+") or an authentication error. 1092 * Note that older versions of Redis replied with "operation not 1093 * permitted" instead of using a proper error code, so we test 1094 * both. */ 1095 if (buf[0] != '+' && 1096 strncmp(buf,"-NOAUTH",7) != 0 && 1097 strncmp(buf,"-ERR operation not permitted",28) != 0) 1098 { 1099 redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); 1100 goto error; 1101 } else { 1102 redisLog(REDIS_NOTICE, 1103 "Master replied to PING, replication can continue..."); 1104 } 1105 } 1106 1107 /* AUTH with the master if required. */ 1108 if(server.masterauth) { 1109 err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); 1110 if (err[0] == '-') { 1111 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); 1112 sdsfree(err); 1113 goto error; 1114 } 1115 sdsfree(err); 1116 } 1117 1118 /* Set the slave port, so that Master's INFO command can list the 1119 * slave listening port correctly. */ 1120 { 1121 sds port = sdsfromlonglong(server.port); 1122 err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, 1123 NULL); 1124 sdsfree(port); 1125 /* Ignore the error if any, not all the Redis versions support 1126 * REPLCONF listening-port. */ 1127 if (err[0] == '-') { 1128 redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); 1129 } 1130 sdsfree(err); 1131 } 1132 1133 /* Try a partial resynchonization. If we don't have a cached master 1134 * slaveTryPartialResynchronization() will at least try to use PSYNC 1135 * to start a full resynchronization so that we get the master run id 1136 * and the global offset, to try a partial resync at the next 1137 * reconnection attempt. */ 1138 psync_result = slaveTryPartialResynchronization(fd); 1139 if (psync_result == PSYNC_CONTINUE) { 1140 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1141 return; 1142 } 1143 1144 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1145 * and the server.repl_master_runid and repl_master_initial_offset are 1146 * already populated. */ 1147 if (psync_result == PSYNC_NOT_SUPPORTED) { 1148 redisLog(REDIS_NOTICE,"Retrying with SYNC..."); 1149 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1150 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", 1151 strerror(errno)); 1152 goto error; 1153 } 1154 } 1155 1156 /* Prepare a suitable temp file for bulk transfer */ 1157 while(maxtries--) { 1158 snprintf(tmpfile,256, 1159 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1160 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1161 if (dfd != -1) break; 1162 sleep(1); 1163 } 1164 if (dfd == -1) { 1165 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1166 goto error; 1167 } 1168 1169 /* Setup the non blocking download of the bulk file. */ 1170 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1171 == AE_ERR) 1172 { 1173 redisLog(REDIS_WARNING, 1174 "Can't create readable event for SYNC: %s (fd=%d)", 1175 strerror(errno),fd); 1176 goto error; 1177 } 1178 1179 server.repl_state = REDIS_REPL_TRANSFER; 1180 server.repl_transfer_size = -1; 1181 server.repl_transfer_read = 0; 1182 server.repl_transfer_last_fsync_off = 0; 1183 server.repl_transfer_fd = dfd; 1184 server.repl_transfer_lastio = server.unixtime; 1185 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1186 return; 1187 1188 error: 1189 close(fd); 1190 server.repl_transfer_s = -1; 1191 server.repl_state = REDIS_REPL_CONNECT; 1192 return; 1193 } 1194 1195 int connectWithMaster(void) { 1196 int fd; 1197 1198 fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); 1199 if (fd == -1) { 1200 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", 1201 strerror(errno)); 1202 return REDIS_ERR; 1203 } 1204 1205 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1206 AE_ERR) 1207 { 1208 close(fd); 1209 redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); 1210 return REDIS_ERR; 1211 } 1212 1213 server.repl_transfer_lastio = server.unixtime; 1214 server.repl_transfer_s = fd; 1215 server.repl_state = REDIS_REPL_CONNECTING; 1216 return REDIS_OK; 1217 } 1218 1219 /* This function can be called when a non blocking connection is currently 1220 * in progress to undo it. */ 1221 void undoConnectWithMaster(void) { 1222 int fd = server.repl_transfer_s; 1223 1224 redisAssert(server.repl_state == REDIS_REPL_CONNECTING || 1225 server.repl_state == REDIS_REPL_RECEIVE_PONG); 1226 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1227 close(fd); 1228 server.repl_transfer_s = -1; 1229 server.repl_state = REDIS_REPL_CONNECT; 1230 } 1231 1232 /* This function aborts a non blocking replication attempt if there is one 1233 * in progress, by canceling the non-blocking connect attempt or 1234 * the initial bulk transfer. 1235 * 1236 * If there was a replication handshake in progress 1 is returned and 1237 * the replication state (server.repl_state) set to REDIS_REPL_CONNECT. 1238 * 1239 * Otherwise zero is returned and no operation is perforemd at all. */ 1240 int cancelReplicationHandshake(void) { 1241 if (server.repl_state == REDIS_REPL_TRANSFER) { 1242 replicationAbortSyncTransfer(); 1243 } else if (server.repl_state == REDIS_REPL_CONNECTING || 1244 server.repl_state == REDIS_REPL_RECEIVE_PONG) 1245 { 1246 undoConnectWithMaster(); 1247 } else { 1248 return 0; 1249 } 1250 return 1; 1251 } 1252 1253 /* Set replication to the specified master address and port. */ 1254 void replicationSetMaster(char *ip, int port) { 1255 sdsfree(server.masterhost); 1256 server.masterhost = sdsnew(ip); 1257 server.masterport = port; 1258 if (server.master) freeClient(server.master); 1259 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1260 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1261 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1262 cancelReplicationHandshake(); 1263 server.repl_state = REDIS_REPL_CONNECT; 1264 server.master_repl_offset = 0; 1265 server.repl_down_since = 0; 1266 } 1267 1268 /* Cancel replication, setting the instance as a master itself. */ 1269 void replicationUnsetMaster(void) { 1270 if (server.masterhost == NULL) return; /* Nothing to do. */ 1271 sdsfree(server.masterhost); 1272 server.masterhost = NULL; 1273 if (server.master) { 1274 if (listLength(server.slaves) == 0) { 1275 /* If this instance is turned into a master and there are no 1276 * slaves, it inherits the replication offset from the master. 1277 * Under certain conditions this makes replicas comparable by 1278 * replication offset to understand what is the most updated. */ 1279 server.master_repl_offset = server.master->reploff; 1280 freeReplicationBacklog(); 1281 } 1282 freeClient(server.master); 1283 } 1284 replicationDiscardCachedMaster(); 1285 cancelReplicationHandshake(); 1286 server.repl_state = REDIS_REPL_NONE; 1287 } 1288 1289 void slaveofCommand(redisClient *c) { 1290 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1291 * configured using the current address of the master node. */ 1292 if (server.cluster_enabled) { 1293 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1294 return; 1295 } 1296 1297 /* The special host/port combination "NO" "ONE" turns the instance 1298 * into a master. Otherwise the new master address is set. */ 1299 if (!strcasecmp(c->argv[1]->ptr,"no") && 1300 !strcasecmp(c->argv[2]->ptr,"one")) { 1301 if (server.masterhost) { 1302 replicationUnsetMaster(); 1303 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); 1304 } 1305 } else { 1306 long port; 1307 1308 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) 1309 return; 1310 1311 /* Check if we are already attached to the specified slave */ 1312 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1313 && server.masterport == port) { 1314 redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1315 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1316 return; 1317 } 1318 /* There was no previous master or the user specified a different one, 1319 * we can continue. */ 1320 replicationSetMaster(c->argv[1]->ptr, port); 1321 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", 1322 server.masterhost, server.masterport); 1323 } 1324 addReply(c,shared.ok); 1325 } 1326 1327 /* ROLE command: provide information about the role of the instance 1328 * (master or slave) and additional information related to replication 1329 * in an easy to process format. */ 1330 void roleCommand(redisClient *c) { 1331 if (server.masterhost == NULL) { 1332 listIter li; 1333 listNode *ln; 1334 void *mbcount; 1335 int slaves = 0; 1336 1337 addReplyMultiBulkLen(c,3); 1338 addReplyBulkCBuffer(c,"master",6); 1339 addReplyLongLong(c,server.master_repl_offset); 1340 mbcount = addDeferredMultiBulkLength(c); 1341 listRewind(server.slaves,&li); 1342 while((ln = listNext(&li))) { 1343 redisClient *slave = ln->value; 1344 char ip[REDIS_IP_STR_LEN]; 1345 1346 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1347 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1348 addReplyMultiBulkLen(c,3); 1349 addReplyBulkCString(c,ip); 1350 addReplyBulkLongLong(c,slave->slave_listening_port); 1351 addReplyBulkLongLong(c,slave->repl_ack_off); 1352 slaves++; 1353 } 1354 setDeferredMultiBulkLength(c,mbcount,slaves); 1355 } else { 1356 char *slavestate = NULL; 1357 1358 addReplyMultiBulkLen(c,5); 1359 addReplyBulkCBuffer(c,"slave",5); 1360 addReplyBulkCString(c,server.masterhost); 1361 addReplyLongLong(c,server.masterport); 1362 switch(server.repl_state) { 1363 case REDIS_REPL_NONE: slavestate = "none"; break; 1364 case REDIS_REPL_CONNECT: slavestate = "connect"; break; 1365 case REDIS_REPL_CONNECTING: slavestate = "connecting"; break; 1366 case REDIS_REPL_RECEIVE_PONG: /* see next */ 1367 case REDIS_REPL_TRANSFER: slavestate = "sync"; break; 1368 case REDIS_REPL_CONNECTED: slavestate = "connected"; break; 1369 default: slavestate = "unknown"; break; 1370 } 1371 addReplyBulkCString(c,slavestate); 1372 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1373 } 1374 } 1375 1376 /* Send a REPLCONF ACK command to the master to inform it about the current 1377 * processed offset. If we are not connected with a master, the command has 1378 * no effects. */ 1379 void replicationSendAck(void) { 1380 redisClient *c = server.master; 1381 1382 if (c != NULL) { 1383 c->flags |= REDIS_MASTER_FORCE_REPLY; 1384 addReplyMultiBulkLen(c,3); 1385 addReplyBulkCString(c,"REPLCONF"); 1386 addReplyBulkCString(c,"ACK"); 1387 addReplyBulkLongLong(c,c->reploff); 1388 c->flags &= ~REDIS_MASTER_FORCE_REPLY; 1389 } 1390 } 1391 1392 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1393 1394 /* In order to implement partial synchronization we need to be able to cache 1395 * our master's client structure after a transient disconnection. 1396 * It is cached into server.cached_master and flushed away using the following 1397 * functions. */ 1398 1399 /* This function is called by freeClient() in order to cache the master 1400 * client structure instead of destryoing it. freeClient() will return 1401 * ASAP after this function returns, so every action needed to avoid problems 1402 * with a client that is really "suspended" has to be done by this function. 1403 * 1404 * The other functions that will deal with the cached master are: 1405 * 1406 * replicationDiscardCachedMaster() that will make sure to kill the client 1407 * as for some reason we don't want to use it in the future. 1408 * 1409 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1410 * handshake in order to reactivate the cached master. 1411 */ 1412 void replicationCacheMaster(redisClient *c) { 1413 listNode *ln; 1414 1415 redisAssert(server.master != NULL && server.cached_master == NULL); 1416 redisLog(REDIS_NOTICE,"Caching the disconnected master state."); 1417 1418 /* Remove from the list of clients, we don't want this client to be 1419 * listed by CLIENT LIST or processed in any way by batch operations. */ 1420 ln = listSearchKey(server.clients,c); 1421 redisAssert(ln != NULL); 1422 listDelNode(server.clients,ln); 1423 1424 /* Save the master. Server.master will be set to null later by 1425 * replicationHandleMasterDisconnection(). */ 1426 server.cached_master = server.master; 1427 1428 /* Remove the event handlers and close the socket. We'll later reuse 1429 * the socket of the new connection with the master during PSYNC. */ 1430 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 1431 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1432 close(c->fd); 1433 1434 /* Set fd to -1 so that we can safely call freeClient(c) later. */ 1435 c->fd = -1; 1436 1437 /* Invalidate the Peer ID cache. */ 1438 if (c->peerid) { 1439 sdsfree(c->peerid); 1440 c->peerid = NULL; 1441 } 1442 1443 /* Caching the master happens instead of the actual freeClient() call, 1444 * so make sure to adjust the replication state. This function will 1445 * also set server.master to NULL. */ 1446 replicationHandleMasterDisconnection(); 1447 } 1448 1449 /* Free a cached master, called when there are no longer the conditions for 1450 * a partial resync on reconnection. */ 1451 void replicationDiscardCachedMaster(void) { 1452 if (server.cached_master == NULL) return; 1453 1454 redisLog(REDIS_NOTICE,"Discarding previously cached master state."); 1455 server.cached_master->flags &= ~REDIS_MASTER; 1456 freeClient(server.cached_master); 1457 server.cached_master = NULL; 1458 } 1459 1460 /* Turn the cached master into the current master, using the file descriptor 1461 * passed as argument as the socket for the new master. 1462 * 1463 * This funciton is called when successfully setup a partial resynchronization 1464 * so the stream of data that we'll receive will start from were this 1465 * master left. */ 1466 void replicationResurrectCachedMaster(int newfd) { 1467 server.master = server.cached_master; 1468 server.cached_master = NULL; 1469 server.master->fd = newfd; 1470 server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); 1471 server.master->authenticated = 1; 1472 server.master->lastinteraction = server.unixtime; 1473 server.repl_state = REDIS_REPL_CONNECTED; 1474 1475 /* Re-add to the list of clients. */ 1476 listAddNodeTail(server.clients,server.master); 1477 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1478 readQueryFromClient, server.master)) { 1479 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1480 freeClientAsync(server.master); /* Close ASAP. */ 1481 } 1482 1483 /* We may also need to install the write handler as well if there is 1484 * pending data in the write buffers. */ 1485 if (server.master->bufpos || listLength(server.master->reply)) { 1486 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1487 sendReplyToClient, server.master)) { 1488 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1489 freeClientAsync(server.master); /* Close ASAP. */ 1490 } 1491 } 1492 } 1493 1494 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1495 1496 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1497 * If the option is active, the server will prevent writes if there are not 1498 * enough connected slaves with the specified lag (or less). */ 1499 void refreshGoodSlavesCount(void) { 1500 listIter li; 1501 listNode *ln; 1502 int good = 0; 1503 1504 if (!server.repl_min_slaves_to_write || 1505 !server.repl_min_slaves_max_lag) return; 1506 1507 listRewind(server.slaves,&li); 1508 while((ln = listNext(&li))) { 1509 redisClient *slave = ln->value; 1510 time_t lag = server.unixtime - slave->repl_ack_time; 1511 1512 if (slave->replstate == REDIS_REPL_ONLINE && 1513 lag <= server.repl_min_slaves_max_lag) good++; 1514 } 1515 server.repl_good_slaves_count = good; 1516 } 1517 1518 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1519 * The goal of this code is to keep track of scripts already sent to every 1520 * connected slave, in order to be able to replicate EVALSHA as it is without 1521 * translating it to EVAL every time it is possible. 1522 * 1523 * We use a capped collection implemented by a hash table for fast lookup 1524 * of scripts we can send as EVALSHA, plus a linked list that is used for 1525 * eviction of the oldest entry when the max number of items is reached. 1526 * 1527 * We don't care about taking a different cache for every different slave 1528 * since to fill the cache again is not very costly, the goal of this code 1529 * is to avoid that the same big script is trasmitted a big number of times 1530 * per second wasting bandwidth and processor speed, but it is not a problem 1531 * if we need to rebuild the cache from scratch from time to time, every used 1532 * script will need to be transmitted a single time to reappear in the cache. 1533 * 1534 * This is how the system works: 1535 * 1536 * 1) Every time a new slave connects, we flush the whole script cache. 1537 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1538 * trying to convert EVAL into EVALSHA specifically for slaves. 1539 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1540 * corresponding SHA1 of the script into the cache as we are sure every 1541 * slave knows about the script starting from now. 1542 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1543 * and at the same time flush the script cache. 1544 * 5) When the last slave disconnects, flush the cache. 1545 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1546 * in the master sometimes. 1547 */ 1548 1549 /* Initialize the script cache, only called at startup. */ 1550 void replicationScriptCacheInit(void) { 1551 server.repl_scriptcache_size = 10000; 1552 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1553 server.repl_scriptcache_fifo = listCreate(); 1554 } 1555 1556 /* Empty the script cache. Should be called every time we are no longer sure 1557 * that every slave knows about all the scripts in our set, or when the 1558 * current AOF "context" is no longer aware of the script. In general we 1559 * should flush the cache: 1560 * 1561 * 1) Every time a new slave reconnects to this master and performs a 1562 * full SYNC (PSYNC does not require flushing). 1563 * 2) Every time an AOF rewrite is performed. 1564 * 3) Every time we are left without slaves at all, and AOF is off, in order 1565 * to reclaim otherwise unused memory. 1566 */ 1567 void replicationScriptCacheFlush(void) { 1568 dictEmpty(server.repl_scriptcache_dict,NULL); 1569 listRelease(server.repl_scriptcache_fifo); 1570 server.repl_scriptcache_fifo = listCreate(); 1571 } 1572 1573 /* Add an entry into the script cache, if we reach max number of entries the 1574 * oldest is removed from the list. */ 1575 void replicationScriptCacheAdd(sds sha1) { 1576 int retval; 1577 sds key = sdsdup(sha1); 1578 1579 /* Evict oldest. */ 1580 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 1581 { 1582 listNode *ln = listLast(server.repl_scriptcache_fifo); 1583 sds oldest = listNodeValue(ln); 1584 1585 retval = dictDelete(server.repl_scriptcache_dict,oldest); 1586 redisAssert(retval == DICT_OK); 1587 listDelNode(server.repl_scriptcache_fifo,ln); 1588 } 1589 1590 /* Add current. */ 1591 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 1592 listAddNodeHead(server.repl_scriptcache_fifo,key); 1593 redisAssert(retval == DICT_OK); 1594 } 1595 1596 /* Returns non-zero if the specified entry exists inside the cache, that is, 1597 * if all the slaves are aware of this script SHA1. */ 1598 int replicationScriptCacheExists(sds sha1) { 1599 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 1600 } 1601 1602 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 1603 * Redis synchronous replication design can be summarized in points: 1604 * 1605 * - Redis masters have a global replication offset, used by PSYNC. 1606 * - Master increment the offset every time new commands are sent to slaves. 1607 * - Slaves ping back masters with the offset processed so far. 1608 * 1609 * So synchronous replication adds a new WAIT command in the form: 1610 * 1611 * WAIT <num_replicas> <milliseconds_timeout> 1612 * 1613 * That returns the number of replicas that processed the query when 1614 * we finally have at least num_replicas, or when the timeout was 1615 * reached. 1616 * 1617 * The command is implemented in this way: 1618 * 1619 * - Every time a client processes a command, we remember the replication 1620 * offset after sending that command to the slaves. 1621 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 1622 * The client is blocked at the same time (see blocked.c). 1623 * - Once we receive enough ACKs for a given offset or when the timeout 1624 * is reached, the WAIT command is unblocked and the reply sent to the 1625 * client. 1626 */ 1627 1628 /* This just set a flag so that we broadcast a REPLCONF GETACK command 1629 * to all the slaves in the beforeSleep() function. Note that this way 1630 * we "group" all the clients that want to wait for synchronouns replication 1631 * in a given event loop iteration, and send a single GETACK for them all. */ 1632 void replicationRequestAckFromSlaves(void) { 1633 server.get_ack_from_slaves = 1; 1634 } 1635 1636 /* Return the number of slaves that already acknowledged the specified 1637 * replication offset. */ 1638 int replicationCountAcksByOffset(long long offset) { 1639 listIter li; 1640 listNode *ln; 1641 int count = 0; 1642 1643 listRewind(server.slaves,&li); 1644 while((ln = listNext(&li))) { 1645 redisClient *slave = ln->value; 1646 1647 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1648 if (slave->repl_ack_off >= offset) count++; 1649 } 1650 return count; 1651 } 1652 1653 /* WAIT for N replicas to acknowledge the processing of our latest 1654 * write command (and all the previous commands). */ 1655 void waitCommand(redisClient *c) { 1656 mstime_t timeout; 1657 long numreplicas, ackreplicas; 1658 long long offset = c->woff; 1659 1660 /* Argument parsing. */ 1661 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK) 1662 return; 1663 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 1664 != REDIS_OK) return; 1665 1666 /* First try without blocking at all. */ 1667 ackreplicas = replicationCountAcksByOffset(c->woff); 1668 if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) { 1669 addReplyLongLong(c,ackreplicas); 1670 return; 1671 } 1672 1673 /* Otherwise block the client and put it into our list of clients 1674 * waiting for ack from slaves. */ 1675 c->bpop.timeout = timeout; 1676 c->bpop.reploffset = offset; 1677 c->bpop.numreplicas = numreplicas; 1678 listAddNodeTail(server.clients_waiting_acks,c); 1679 blockClient(c,REDIS_BLOCKED_WAIT); 1680 1681 /* Make sure that the server will send an ACK request to all the slaves 1682 * before returning to the event loop. */ 1683 replicationRequestAckFromSlaves(); 1684 } 1685 1686 /* This is called by unblockClient() to perform the blocking op type 1687 * specific cleanup. We just remove the client from the list of clients 1688 * waiting for replica acks. Never call it directly, call unblockClient() 1689 * instead. */ 1690 void unblockClientWaitingReplicas(redisClient *c) { 1691 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 1692 redisAssert(ln != NULL); 1693 listDelNode(server.clients_waiting_acks,ln); 1694 } 1695 1696 /* Check if there are clients blocked in WAIT that can be unblocked since 1697 * we received enough ACKs from slaves. */ 1698 void processClientsWaitingReplicas(void) { 1699 long long last_offset = 0; 1700 int last_numreplicas = 0; 1701 1702 listIter li; 1703 listNode *ln; 1704 1705 listRewind(server.clients_waiting_acks,&li); 1706 while((ln = listNext(&li))) { 1707 redisClient *c = ln->value; 1708 1709 /* Every time we find a client that is satisfied for a given 1710 * offset and number of replicas, we remember it so the next client 1711 * may be unblocked without calling replicationCountAcksByOffset() 1712 * if the requested offset / replicas were equal or less. */ 1713 if (last_offset && last_offset > c->bpop.reploffset && 1714 last_numreplicas > c->bpop.numreplicas) 1715 { 1716 unblockClient(c); 1717 addReplyLongLong(c,last_numreplicas); 1718 } else { 1719 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 1720 1721 if (numreplicas >= c->bpop.numreplicas) { 1722 last_offset = c->bpop.reploffset; 1723 last_numreplicas = numreplicas; 1724 unblockClient(c); 1725 addReplyLongLong(c,numreplicas); 1726 } 1727 } 1728 } 1729 } 1730 1731 /* Return the slave replication offset for this instance, that is 1732 * the offset for which we already processed the master replication stream. */ 1733 long long replicationGetSlaveOffset(void) { 1734 long long offset = 0; 1735 1736 if (server.masterhost != NULL) { 1737 if (server.master) { 1738 offset = server.master->reploff; 1739 } else if (server.cached_master) { 1740 offset = server.cached_master->reploff; 1741 } 1742 } 1743 /* offset may be -1 when the master does not support it at all, however 1744 * this function is designed to return an offset that can express the 1745 * amount of data processed by the master, so we return a positive 1746 * integer. */ 1747 if (offset < 0) offset = 0; 1748 return offset; 1749 } 1750 1751 /* --------------------------- REPLICATION CRON ---------------------------- */ 1752 1753 /* Replication cron funciton, called 1 time per second. */ 1754 void replicationCron(void) { 1755 /* Non blocking connection timeout? */ 1756 if (server.masterhost && 1757 (server.repl_state == REDIS_REPL_CONNECTING || 1758 server.repl_state == REDIS_REPL_RECEIVE_PONG) && 1759 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1760 { 1761 redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); 1762 undoConnectWithMaster(); 1763 } 1764 1765 /* Bulk transfer I/O timeout? */ 1766 if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER && 1767 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1768 { 1769 redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 1770 replicationAbortSyncTransfer(); 1771 } 1772 1773 /* Timed out master when we are an already connected slave? */ 1774 if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED && 1775 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 1776 { 1777 redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received..."); 1778 freeClient(server.master); 1779 } 1780 1781 /* Check if we should connect to a MASTER */ 1782 if (server.repl_state == REDIS_REPL_CONNECT) { 1783 redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", 1784 server.masterhost, server.masterport); 1785 if (connectWithMaster() == REDIS_OK) { 1786 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); 1787 } 1788 } 1789 1790 /* Send ACK to master from time to time. 1791 * Note that we do not send periodic acks to masters that don't 1792 * support PSYNC and replication offsets. */ 1793 if (server.masterhost && server.master && 1794 !(server.master->flags & REDIS_PRE_PSYNC)) 1795 replicationSendAck(); 1796 1797 /* If we have attached slaves, PING them from time to time. 1798 * So slaves can implement an explicit timeout to masters, and will 1799 * be able to detect a link disconnection even if the TCP connection 1800 * will not actually go down. */ 1801 if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { 1802 listIter li; 1803 listNode *ln; 1804 robj *ping_argv[1]; 1805 1806 /* First, send PING */ 1807 ping_argv[0] = createStringObject("PING",4); 1808 replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); 1809 decrRefCount(ping_argv[0]); 1810 1811 /* Second, send a newline to all the slaves in pre-synchronization 1812 * stage, that is, slaves waiting for the master to create the RDB file. 1813 * The newline will be ignored by the slave but will refresh the 1814 * last-io timer preventing a timeout. */ 1815 listRewind(server.slaves,&li); 1816 while((ln = listNext(&li))) { 1817 redisClient *slave = ln->value; 1818 1819 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || 1820 slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { 1821 if (write(slave->fd, "\n", 1) == -1) { 1822 /* Don't worry, it's just a ping. */ 1823 } 1824 } 1825 } 1826 } 1827 1828 /* Disconnect timedout slaves. */ 1829 if (listLength(server.slaves)) { 1830 listIter li; 1831 listNode *ln; 1832 1833 listRewind(server.slaves,&li); 1834 while((ln = listNext(&li))) { 1835 redisClient *slave = ln->value; 1836 1837 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1838 if (slave->flags & REDIS_PRE_PSYNC) continue; 1839 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 1840 { 1841 char ip[REDIS_IP_STR_LEN]; 1842 int port; 1843 1844 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) { 1845 redisLog(REDIS_WARNING, 1846 "Disconnecting timedout slave: %s:%d", 1847 ip, slave->slave_listening_port); 1848 } 1849 freeClient(slave); 1850 } 1851 } 1852 } 1853 1854 /* If we have no attached slaves and there is a replication backlog 1855 * using memory, free it after some (configured) time. */ 1856 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 1857 server.repl_backlog) 1858 { 1859 time_t idle = server.unixtime - server.repl_no_slaves_since; 1860 1861 if (idle > server.repl_backlog_time_limit) { 1862 freeReplicationBacklog(); 1863 redisLog(REDIS_NOTICE, 1864 "Replication backlog freed after %d seconds " 1865 "without connected slaves.", 1866 (int) server.repl_backlog_time_limit); 1867 } 1868 } 1869 1870 /* If AOF is disabled and we no longer have attached slaves, we can 1871 * free our Replication Script Cache as there is no need to propagate 1872 * EVALSHA at all. */ 1873 if (listLength(server.slaves) == 0 && 1874 server.aof_state == REDIS_AOF_OFF && 1875 listLength(server.repl_scriptcache_fifo) != 0) 1876 { 1877 replicationScriptCacheFlush(); 1878 } 1879 1880 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 1881 refreshGoodSlavesCount(); 1882 } 1883