1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "server.h" 33 #include "cluster.h" 34 35 #include <sys/time.h> 36 #include <unistd.h> 37 #include <fcntl.h> 38 #include <sys/socket.h> 39 #include <sys/stat.h> 40 41 void replicationDiscardCachedMaster(void); 42 void replicationResurrectCachedMaster(int newfd); 43 void replicationSendAck(void); 44 void putSlaveOnline(client *slave); 45 int cancelReplicationHandshake(void); 46 47 /* --------------------------- Utility functions ---------------------------- */ 48 49 /* Return the pointer to a string representing the slave ip:listening_port 50 * pair. Mostly useful for logging, since we want to log a slave using its 51 * IP address and its listening port which is more clear for the user, for 52 * example: "Closing connection with replica 10.1.2.3:6380". */ 53 char *replicationGetSlaveName(client *c) { 54 static char buf[NET_PEER_ID_LEN]; 55 char ip[NET_IP_STR_LEN]; 56 57 ip[0] = '\0'; 58 buf[0] = '\0'; 59 if (c->slave_ip[0] != '\0' || 60 anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) 61 { 62 /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */ 63 if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip)); 64 65 if (c->slave_listening_port) 66 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); 67 else 68 snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip); 69 } else { 70 snprintf(buf,sizeof(buf),"client id #%llu", 71 (unsigned long long) c->id); 72 } 73 return buf; 74 } 75 76 /* ---------------------------------- MASTER -------------------------------- */ 77 78 void createReplicationBacklog(void) { 79 serverAssert(server.repl_backlog == NULL); 80 server.repl_backlog = zmalloc(server.repl_backlog_size); 81 server.repl_backlog_histlen = 0; 82 server.repl_backlog_idx = 0; 83 84 /* We don't have any data inside our buffer, but virtually the first 85 * byte we have is the next byte that will be generated for the 86 * replication stream. */ 87 server.repl_backlog_off = server.master_repl_offset+1; 88 } 89 90 /* This function is called when the user modifies the replication backlog 91 * size at runtime. It is up to the function to both update the 92 * server.repl_backlog_size and to resize the buffer and setup it so that 93 * it contains the same data as the previous one (possibly less data, but 94 * the most recent bytes, or the same data and more free space in case the 95 * buffer is enlarged). */ 96 void resizeReplicationBacklog(long long newsize) { 97 if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) 98 newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; 99 if (server.repl_backlog_size == newsize) return; 100 101 server.repl_backlog_size = newsize; 102 if (server.repl_backlog != NULL) { 103 /* What we actually do is to flush the old buffer and realloc a new 104 * empty one. It will refill with new data incrementally. 105 * The reason is that copying a few gigabytes adds latency and even 106 * worse often we need to alloc additional space before freeing the 107 * old buffer. */ 108 zfree(server.repl_backlog); 109 server.repl_backlog = zmalloc(server.repl_backlog_size); 110 server.repl_backlog_histlen = 0; 111 server.repl_backlog_idx = 0; 112 /* Next byte we have is... the next since the buffer is empty. */ 113 server.repl_backlog_off = server.master_repl_offset+1; 114 } 115 } 116 117 void freeReplicationBacklog(void) { 118 serverAssert(listLength(server.slaves) == 0); 119 zfree(server.repl_backlog); 120 server.repl_backlog = NULL; 121 } 122 123 /* Add data to the replication backlog. 124 * This function also increments the global replication offset stored at 125 * server.master_repl_offset, because there is no case where we want to feed 126 * the backlog without incrementing the offset. */ 127 void feedReplicationBacklog(void *ptr, size_t len) { 128 unsigned char *p = ptr; 129 130 server.master_repl_offset += len; 131 132 /* This is a circular buffer, so write as much data we can at every 133 * iteration and rewind the "idx" index if we reach the limit. */ 134 while(len) { 135 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 136 if (thislen > len) thislen = len; 137 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 138 server.repl_backlog_idx += thislen; 139 if (server.repl_backlog_idx == server.repl_backlog_size) 140 server.repl_backlog_idx = 0; 141 len -= thislen; 142 p += thislen; 143 server.repl_backlog_histlen += thislen; 144 } 145 if (server.repl_backlog_histlen > server.repl_backlog_size) 146 server.repl_backlog_histlen = server.repl_backlog_size; 147 /* Set the offset of the first byte we have in the backlog. */ 148 server.repl_backlog_off = server.master_repl_offset - 149 server.repl_backlog_histlen + 1; 150 } 151 152 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 153 * as input. */ 154 void feedReplicationBacklogWithObject(robj *o) { 155 char llstr[LONG_STR_SIZE]; 156 void *p; 157 size_t len; 158 159 if (o->encoding == OBJ_ENCODING_INT) { 160 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 161 p = llstr; 162 } else { 163 len = sdslen(o->ptr); 164 p = o->ptr; 165 } 166 feedReplicationBacklog(p,len); 167 } 168 169 /* Propagate write commands to slaves, and populate the replication backlog 170 * as well. This function is used if the instance is a master: we use 171 * the commands received by our clients in order to create the replication 172 * stream. Instead if the instance is a slave and has sub-slaves attached, 173 * we use replicationFeedSlavesFromMaster() */ 174 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 175 listNode *ln; 176 listIter li; 177 int j, len; 178 char llstr[LONG_STR_SIZE]; 179 180 /* If the instance is not a top level master, return ASAP: we'll just proxy 181 * the stream of data we receive from our master instead, in order to 182 * propagate *identical* replication stream. In this way this slave can 183 * advertise the same replication ID as the master (since it shares the 184 * master replication history and has the same backlog and offsets). */ 185 if (server.masterhost != NULL) return; 186 187 /* If there aren't slaves, and there is no backlog buffer to populate, 188 * we can return ASAP. */ 189 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 190 191 /* We can't have slaves attached and no backlog. */ 192 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 193 194 /* Send SELECT command to every slave if needed. */ 195 if (server.slaveseldb != dictid) { 196 robj *selectcmd; 197 198 /* For a few DBs we have pre-computed SELECT command. */ 199 if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { 200 selectcmd = shared.select[dictid]; 201 } else { 202 int dictid_len; 203 204 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 205 selectcmd = createObject(OBJ_STRING, 206 sdscatprintf(sdsempty(), 207 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 208 dictid_len, llstr)); 209 } 210 211 /* Add the SELECT command into the backlog. */ 212 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 213 214 /* Send it to slaves. */ 215 listRewind(slaves,&li); 216 while((ln = listNext(&li))) { 217 client *slave = ln->value; 218 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 219 addReply(slave,selectcmd); 220 } 221 222 if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) 223 decrRefCount(selectcmd); 224 } 225 server.slaveseldb = dictid; 226 227 /* Write the command to the replication backlog if any. */ 228 if (server.repl_backlog) { 229 char aux[LONG_STR_SIZE+3]; 230 231 /* Add the multi bulk reply length. */ 232 aux[0] = '*'; 233 len = ll2string(aux+1,sizeof(aux)-1,argc); 234 aux[len+1] = '\r'; 235 aux[len+2] = '\n'; 236 feedReplicationBacklog(aux,len+3); 237 238 for (j = 0; j < argc; j++) { 239 long objlen = stringObjectLen(argv[j]); 240 241 /* We need to feed the buffer with the object as a bulk reply 242 * not just as a plain string, so create the $..CRLF payload len 243 * and add the final CRLF */ 244 aux[0] = '$'; 245 len = ll2string(aux+1,sizeof(aux)-1,objlen); 246 aux[len+1] = '\r'; 247 aux[len+2] = '\n'; 248 feedReplicationBacklog(aux,len+3); 249 feedReplicationBacklogWithObject(argv[j]); 250 feedReplicationBacklog(aux+len+1,2); 251 } 252 } 253 254 /* Write the command to every slave. */ 255 listRewind(slaves,&li); 256 while((ln = listNext(&li))) { 257 client *slave = ln->value; 258 259 /* Don't feed slaves that are still waiting for BGSAVE to start */ 260 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 261 262 /* Feed slaves that are waiting for the initial SYNC (so these commands 263 * are queued in the output buffer until the initial SYNC completes), 264 * or are already in sync with the master. */ 265 266 /* Add the multi bulk length. */ 267 addReplyMultiBulkLen(slave,argc); 268 269 /* Finally any additional argument that was not stored inside the 270 * static buffer if any (from j to argc). */ 271 for (j = 0; j < argc; j++) 272 addReplyBulk(slave,argv[j]); 273 } 274 } 275 276 /* This function is used in order to proxy what we receive from our master 277 * to our sub-slaves. */ 278 #include <ctype.h> 279 void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { 280 listNode *ln; 281 listIter li; 282 283 /* Debugging: this is handy to see the stream sent from master 284 * to slaves. Disabled with if(0). */ 285 if (0) { 286 printf("%zu:",buflen); 287 for (size_t j = 0; j < buflen; j++) { 288 printf("%c", isprint(buf[j]) ? buf[j] : '.'); 289 } 290 printf("\n"); 291 } 292 293 if (server.repl_backlog) feedReplicationBacklog(buf,buflen); 294 listRewind(slaves,&li); 295 while((ln = listNext(&li))) { 296 client *slave = ln->value; 297 298 /* Don't feed slaves that are still waiting for BGSAVE to start */ 299 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 300 addReplyString(slave,buf,buflen); 301 } 302 } 303 304 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { 305 listNode *ln; 306 listIter li; 307 int j; 308 sds cmdrepr = sdsnew("+"); 309 robj *cmdobj; 310 struct timeval tv; 311 312 gettimeofday(&tv,NULL); 313 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 314 if (c->flags & CLIENT_LUA) { 315 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 316 } else if (c->flags & CLIENT_UNIX_SOCKET) { 317 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 318 } else { 319 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 320 } 321 322 for (j = 0; j < argc; j++) { 323 if (argv[j]->encoding == OBJ_ENCODING_INT) { 324 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 325 } else { 326 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 327 sdslen(argv[j]->ptr)); 328 } 329 if (j != argc-1) 330 cmdrepr = sdscatlen(cmdrepr," ",1); 331 } 332 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 333 cmdobj = createObject(OBJ_STRING,cmdrepr); 334 335 listRewind(monitors,&li); 336 while((ln = listNext(&li))) { 337 client *monitor = ln->value; 338 addReply(monitor,cmdobj); 339 } 340 decrRefCount(cmdobj); 341 } 342 343 /* Feed the slave 'c' with the replication backlog starting from the 344 * specified 'offset' up to the end of the backlog. */ 345 long long addReplyReplicationBacklog(client *c, long long offset) { 346 long long j, skip, len; 347 348 serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); 349 350 if (server.repl_backlog_histlen == 0) { 351 serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); 352 return 0; 353 } 354 355 serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", 356 server.repl_backlog_size); 357 serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", 358 server.repl_backlog_off); 359 serverLog(LL_DEBUG, "[PSYNC] History len: %lld", 360 server.repl_backlog_histlen); 361 serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", 362 server.repl_backlog_idx); 363 364 /* Compute the amount of bytes we need to discard. */ 365 skip = offset - server.repl_backlog_off; 366 serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); 367 368 /* Point j to the oldest byte, that is actually our 369 * server.repl_backlog_off byte. */ 370 j = (server.repl_backlog_idx + 371 (server.repl_backlog_size-server.repl_backlog_histlen)) % 372 server.repl_backlog_size; 373 serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); 374 375 /* Discard the amount of data to seek to the specified 'offset'. */ 376 j = (j + skip) % server.repl_backlog_size; 377 378 /* Feed slave with data. Since it is a circular buffer we have to 379 * split the reply in two parts if we are cross-boundary. */ 380 len = server.repl_backlog_histlen - skip; 381 serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); 382 while(len) { 383 long long thislen = 384 ((server.repl_backlog_size - j) < len) ? 385 (server.repl_backlog_size - j) : len; 386 387 serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 388 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 389 len -= thislen; 390 j = 0; 391 } 392 return server.repl_backlog_histlen - skip; 393 } 394 395 /* Return the offset to provide as reply to the PSYNC command received 396 * from the slave. The returned value is only valid immediately after 397 * the BGSAVE process started and before executing any other command 398 * from clients. */ 399 long long getPsyncInitialOffset(void) { 400 return server.master_repl_offset; 401 } 402 403 /* Send a FULLRESYNC reply in the specific case of a full resynchronization, 404 * as a side effect setup the slave for a full sync in different ways: 405 * 406 * 1) Remember, into the slave client structure, the replication offset 407 * we sent here, so that if new slaves will later attach to the same 408 * background RDB saving process (by duplicating this client output 409 * buffer), we can get the right offset from this slave. 410 * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that 411 * we start accumulating differences from this point. 412 * 3) Force the replication stream to re-emit a SELECT statement so 413 * the new slave incremental differences will start selecting the 414 * right database number. 415 * 416 * Normally this function should be called immediately after a successful 417 * BGSAVE for replication was started, or when there is one already in 418 * progress that we attached our slave to. */ 419 int replicationSetupSlaveForFullResync(client *slave, long long offset) { 420 char buf[128]; 421 int buflen; 422 423 slave->psync_initial_offset = offset; 424 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; 425 /* We are going to accumulate the incremental changes for this 426 * slave as well. Set slaveseldb to -1 in order to force to re-emit 427 * a SELECT statement in the replication stream. */ 428 server.slaveseldb = -1; 429 430 /* Don't send this reply to slaves that approached us with 431 * the old SYNC command. */ 432 if (!(slave->flags & CLIENT_PRE_PSYNC)) { 433 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 434 server.replid,offset); 435 if (write(slave->fd,buf,buflen) != buflen) { 436 freeClientAsync(slave); 437 return C_ERR; 438 } 439 } 440 return C_OK; 441 } 442 443 /* This function handles the PSYNC command from the point of view of a 444 * master receiving a request for partial resynchronization. 445 * 446 * On success return C_OK, otherwise C_ERR is returned and we proceed 447 * with the usual full resync. */ 448 int masterTryPartialResynchronization(client *c) { 449 long long psync_offset, psync_len; 450 char *master_replid = c->argv[1]->ptr; 451 char buf[128]; 452 int buflen; 453 454 /* Parse the replication offset asked by the slave. Go to full sync 455 * on parse error: this should never happen but we try to handle 456 * it in a robust way compared to aborting. */ 457 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 458 C_OK) goto need_full_resync; 459 460 /* Is the replication ID of this master the same advertised by the wannabe 461 * slave via PSYNC? If the replication ID changed this master has a 462 * different replication history, and there is no way to continue. 463 * 464 * Note that there are two potentially valid replication IDs: the ID1 465 * and the ID2. The ID2 however is only valid up to a specific offset. */ 466 if (strcasecmp(master_replid, server.replid) && 467 (strcasecmp(master_replid, server.replid2) || 468 psync_offset > server.second_replid_offset)) 469 { 470 /* Run id "?" is used by slaves that want to force a full resync. */ 471 if (master_replid[0] != '?') { 472 if (strcasecmp(master_replid, server.replid) && 473 strcasecmp(master_replid, server.replid2)) 474 { 475 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " 476 "Replication ID mismatch (Replica asked for '%s', my " 477 "replication IDs are '%s' and '%s')", 478 master_replid, server.replid, server.replid2); 479 } else { 480 serverLog(LL_NOTICE,"Partial resynchronization not accepted: " 481 "Requested offset for second ID was %lld, but I can reply " 482 "up to %lld", psync_offset, server.second_replid_offset); 483 } 484 } else { 485 serverLog(LL_NOTICE,"Full resync requested by replica %s", 486 replicationGetSlaveName(c)); 487 } 488 goto need_full_resync; 489 } 490 491 /* We still have the data our slave is asking for? */ 492 if (!server.repl_backlog || 493 psync_offset < server.repl_backlog_off || 494 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 495 { 496 serverLog(LL_NOTICE, 497 "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); 498 if (psync_offset > server.master_repl_offset) { 499 serverLog(LL_WARNING, 500 "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 501 } 502 goto need_full_resync; 503 } 504 505 /* If we reached this point, we are able to perform a partial resync: 506 * 1) Set client state to make it a slave. 507 * 2) Inform the client we can continue with +CONTINUE 508 * 3) Send the backlog data (from the offset to the end) to the slave. */ 509 c->flags |= CLIENT_SLAVE; 510 c->replstate = SLAVE_STATE_ONLINE; 511 c->repl_ack_time = server.unixtime; 512 c->repl_put_online_on_ack = 0; 513 listAddNodeTail(server.slaves,c); 514 /* We can't use the connection buffers since they are used to accumulate 515 * new commands at this stage. But we are sure the socket send buffer is 516 * empty so this write will never fail actually. */ 517 if (c->slave_capa & SLAVE_CAPA_PSYNC2) { 518 buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); 519 } else { 520 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 521 } 522 if (write(c->fd,buf,buflen) != buflen) { 523 freeClientAsync(c); 524 return C_OK; 525 } 526 psync_len = addReplyReplicationBacklog(c,psync_offset); 527 serverLog(LL_NOTICE, 528 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 529 replicationGetSlaveName(c), 530 psync_len, psync_offset); 531 /* Note that we don't need to set the selected DB at server.slaveseldb 532 * to -1 to force the master to emit SELECT, since the slave already 533 * has this state from the previous connection with the master. */ 534 535 refreshGoodSlavesCount(); 536 return C_OK; /* The caller can return, no full resync needed. */ 537 538 need_full_resync: 539 /* We need a full resync for some reason... Note that we can't 540 * reply to PSYNC right now if a full SYNC is needed. The reply 541 * must include the master offset at the time the RDB file we transfer 542 * is generated, so we need to delay the reply to that moment. */ 543 return C_ERR; 544 } 545 546 /* Start a BGSAVE for replication goals, which is, selecting the disk or 547 * socket target depending on the configuration, and making sure that 548 * the script cache is flushed before to start. 549 * 550 * The mincapa argument is the bitwise AND among all the slaves capabilities 551 * of the slaves waiting for this BGSAVE, so represents the slave capabilities 552 * all the slaves support. Can be tested via SLAVE_CAPA_* macros. 553 * 554 * Side effects, other than starting a BGSAVE: 555 * 556 * 1) Handle the slaves in WAIT_START state, by preparing them for a full 557 * sync if the BGSAVE was successfully started, or sending them an error 558 * and dropping them from the list of slaves. 559 * 560 * 2) Flush the Lua scripting script cache if the BGSAVE was actually 561 * started. 562 * 563 * Returns C_OK on success or C_ERR otherwise. */ 564 int startBgsaveForReplication(int mincapa) { 565 int retval; 566 int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); 567 listIter li; 568 listNode *ln; 569 570 serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", 571 socket_target ? "replicas sockets" : "disk"); 572 573 rdbSaveInfo rsi, *rsiptr; 574 rsiptr = rdbPopulateSaveInfo(&rsi); 575 /* Only do rdbSave* when rsiptr is not NULL, 576 * otherwise slave will miss repl-stream-db. */ 577 if (rsiptr) { 578 if (socket_target) 579 retval = rdbSaveToSlavesSockets(rsiptr); 580 else 581 retval = rdbSaveBackground(server.rdb_filename,rsiptr); 582 } else { 583 serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); 584 retval = C_ERR; 585 } 586 587 /* If we failed to BGSAVE, remove the slaves waiting for a full 588 * resynchorinization from the list of salves, inform them with 589 * an error about what happened, close the connection ASAP. */ 590 if (retval == C_ERR) { 591 serverLog(LL_WARNING,"BGSAVE for replication failed"); 592 listRewind(server.slaves,&li); 593 while((ln = listNext(&li))) { 594 client *slave = ln->value; 595 596 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 597 slave->replstate = REPL_STATE_NONE; 598 slave->flags &= ~CLIENT_SLAVE; 599 listDelNode(server.slaves,ln); 600 addReplyError(slave, 601 "BGSAVE failed, replication can't continue"); 602 slave->flags |= CLIENT_CLOSE_AFTER_REPLY; 603 } 604 } 605 return retval; 606 } 607 608 /* If the target is socket, rdbSaveToSlavesSockets() already setup 609 * the salves for a full resync. Otherwise for disk target do it now.*/ 610 if (!socket_target) { 611 listRewind(server.slaves,&li); 612 while((ln = listNext(&li))) { 613 client *slave = ln->value; 614 615 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 616 replicationSetupSlaveForFullResync(slave, 617 getPsyncInitialOffset()); 618 } 619 } 620 } 621 622 /* Flush the script cache, since we need that slave differences are 623 * accumulated without requiring slaves to match our cached scripts. */ 624 if (retval == C_OK) replicationScriptCacheFlush(); 625 return retval; 626 } 627 628 /* SYNC and PSYNC command implemenation. */ 629 void syncCommand(client *c) { 630 /* ignore SYNC if already slave or in monitor mode */ 631 if (c->flags & CLIENT_SLAVE) return; 632 633 /* Refuse SYNC requests if we are a slave but the link with our master 634 * is not ok... */ 635 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { 636 addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); 637 return; 638 } 639 640 /* SYNC can't be issued when the server has pending data to send to 641 * the client about already issued commands. We need a fresh reply 642 * buffer registering the differences between the BGSAVE and the current 643 * dataset, so that we can copy to other slaves if needed. */ 644 if (clientHasPendingReplies(c)) { 645 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 646 return; 647 } 648 649 serverLog(LL_NOTICE,"Replica %s asks for synchronization", 650 replicationGetSlaveName(c)); 651 652 /* Try a partial resynchronization if this is a PSYNC command. 653 * If it fails, we continue with usual full resynchronization, however 654 * when this happens masterTryPartialResynchronization() already 655 * replied with: 656 * 657 * +FULLRESYNC <replid> <offset> 658 * 659 * So the slave knows the new replid and offset to try a PSYNC later 660 * if the connection with the master is lost. */ 661 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 662 if (masterTryPartialResynchronization(c) == C_OK) { 663 server.stat_sync_partial_ok++; 664 return; /* No full resync needed, return. */ 665 } else { 666 char *master_replid = c->argv[1]->ptr; 667 668 /* Increment stats for failed PSYNCs, but only if the 669 * replid is not "?", as this is used by slaves to force a full 670 * resync on purpose when they are not albe to partially 671 * resync. */ 672 if (master_replid[0] != '?') server.stat_sync_partial_err++; 673 } 674 } else { 675 /* If a slave uses SYNC, we are dealing with an old implementation 676 * of the replication protocol (like redis-cli --slave). Flag the client 677 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 678 c->flags |= CLIENT_PRE_PSYNC; 679 } 680 681 /* Full resynchronization. */ 682 server.stat_sync_full++; 683 684 /* Setup the slave as one waiting for BGSAVE to start. The following code 685 * paths will change the state if we handle the slave differently. */ 686 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; 687 if (server.repl_disable_tcp_nodelay) 688 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 689 c->repldbfd = -1; 690 c->flags |= CLIENT_SLAVE; 691 listAddNodeTail(server.slaves,c); 692 693 /* Create the replication backlog if needed. */ 694 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { 695 /* When we create the backlog from scratch, we always use a new 696 * replication ID and clear the ID2, since there is no valid 697 * past history. */ 698 changeReplicationId(); 699 clearReplicationId2(); 700 createReplicationBacklog(); 701 } 702 703 /* CASE 1: BGSAVE is in progress, with disk target. */ 704 if (server.rdb_child_pid != -1 && 705 server.rdb_child_type == RDB_CHILD_TYPE_DISK) 706 { 707 /* Ok a background save is in progress. Let's check if it is a good 708 * one for replication, i.e. if there is another slave that is 709 * registering differences since the server forked to save. */ 710 client *slave; 711 listNode *ln; 712 listIter li; 713 714 listRewind(server.slaves,&li); 715 while((ln = listNext(&li))) { 716 slave = ln->value; 717 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 718 } 719 /* To attach this slave, we check that it has at least all the 720 * capabilities of the slave that triggered the current BGSAVE. */ 721 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 722 /* Perfect, the server is already registering differences for 723 * another slave. Set the right state, and copy the buffer. */ 724 copyClientOutputBuffer(c,slave); 725 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); 726 serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); 727 } else { 728 /* No way, we need to wait for the next BGSAVE in order to 729 * register differences. */ 730 serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); 731 } 732 733 /* CASE 2: BGSAVE is in progress, with socket target. */ 734 } else if (server.rdb_child_pid != -1 && 735 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) 736 { 737 /* There is an RDB child process but it is writing directly to 738 * children sockets. We need to wait for the next BGSAVE 739 * in order to synchronize. */ 740 serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); 741 742 /* CASE 3: There is no BGSAVE is progress. */ 743 } else { 744 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { 745 /* Diskless replication RDB child is created inside 746 * replicationCron() since we want to delay its start a 747 * few seconds to wait for more slaves to arrive. */ 748 if (server.repl_diskless_sync_delay) 749 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); 750 } else { 751 /* Target is disk (or the slave is not capable of supporting 752 * diskless replication) and we don't have a BGSAVE in progress, 753 * let's start one. */ 754 if (server.aof_child_pid == -1) { 755 startBgsaveForReplication(c->slave_capa); 756 } else { 757 serverLog(LL_NOTICE, 758 "No BGSAVE in progress, but an AOF rewrite is active. " 759 "BGSAVE for replication delayed"); 760 } 761 } 762 } 763 return; 764 } 765 766 /* REPLCONF <option> <value> <option> <value> ... 767 * This command is used by a slave in order to configure the replication 768 * process before starting it with the SYNC command. 769 * 770 * Currently the only use of this command is to communicate to the master 771 * what is the listening port of the Slave redis instance, so that the 772 * master can accurately list slaves and their listening ports in 773 * the INFO output. 774 * 775 * In the future the same command can be used in order to configure 776 * the replication to initiate an incremental replication instead of a 777 * full resync. */ 778 void replconfCommand(client *c) { 779 int j; 780 781 if ((c->argc % 2) == 0) { 782 /* Number of arguments must be odd to make sure that every 783 * option has a corresponding value. */ 784 addReply(c,shared.syntaxerr); 785 return; 786 } 787 788 /* Process every option-value pair. */ 789 for (j = 1; j < c->argc; j+=2) { 790 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 791 long port; 792 793 if ((getLongFromObjectOrReply(c,c->argv[j+1], 794 &port,NULL) != C_OK)) 795 return; 796 c->slave_listening_port = port; 797 } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) { 798 sds ip = c->argv[j+1]->ptr; 799 if (sdslen(ip) < sizeof(c->slave_ip)) { 800 memcpy(c->slave_ip,ip,sdslen(ip)+1); 801 } else { 802 addReplyErrorFormat(c,"REPLCONF ip-address provided by " 803 "replica instance is too long: %zd bytes", sdslen(ip)); 804 return; 805 } 806 } else if (!strcasecmp(c->argv[j]->ptr,"capa")) { 807 /* Ignore capabilities not understood by this master. */ 808 if (!strcasecmp(c->argv[j+1]->ptr,"eof")) 809 c->slave_capa |= SLAVE_CAPA_EOF; 810 else if (!strcasecmp(c->argv[j+1]->ptr,"psync2")) 811 c->slave_capa |= SLAVE_CAPA_PSYNC2; 812 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 813 /* REPLCONF ACK is used by slave to inform the master the amount 814 * of replication stream that it processed so far. It is an 815 * internal only command that normal clients should never use. */ 816 long long offset; 817 818 if (!(c->flags & CLIENT_SLAVE)) return; 819 if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) 820 return; 821 if (offset > c->repl_ack_off) 822 c->repl_ack_off = offset; 823 c->repl_ack_time = server.unixtime; 824 /* If this was a diskless replication, we need to really put 825 * the slave online when the first ACK is received (which 826 * confirms slave is online and ready to get more data). */ 827 if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) 828 putSlaveOnline(c); 829 /* Note: this command does not reply anything! */ 830 return; 831 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 832 /* REPLCONF GETACK is used in order to request an ACK ASAP 833 * to the slave. */ 834 if (server.masterhost && server.master) replicationSendAck(); 835 return; 836 } else { 837 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 838 (char*)c->argv[j]->ptr); 839 return; 840 } 841 } 842 addReply(c,shared.ok); 843 } 844 845 /* This function puts a slave in the online state, and should be called just 846 * after a slave received the RDB file for the initial synchronization, and 847 * we are finally ready to send the incremental stream of commands. 848 * 849 * It does a few things: 850 * 851 * 1) Put the slave in ONLINE state (useless when the function is called 852 * because state is already ONLINE but repl_put_online_on_ack is true). 853 * 2) Make sure the writable event is re-installed, since calling the SYNC 854 * command disables it, so that we can accumulate output buffer without 855 * sending it to the slave. 856 * 3) Update the count of good slaves. */ 857 void putSlaveOnline(client *slave) { 858 slave->replstate = SLAVE_STATE_ONLINE; 859 slave->repl_put_online_on_ack = 0; 860 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ 861 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 862 sendReplyToClient, slave) == AE_ERR) { 863 serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); 864 freeClient(slave); 865 return; 866 } 867 refreshGoodSlavesCount(); 868 serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", 869 replicationGetSlaveName(slave)); 870 } 871 872 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 873 client *slave = privdata; 874 UNUSED(el); 875 UNUSED(mask); 876 char buf[PROTO_IOBUF_LEN]; 877 ssize_t nwritten, buflen; 878 879 /* Before sending the RDB file, we send the preamble as configured by the 880 * replication process. Currently the preamble is just the bulk count of 881 * the file in the form "$<length>\r\n". */ 882 if (slave->replpreamble) { 883 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 884 if (nwritten == -1) { 885 serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s", 886 strerror(errno)); 887 freeClient(slave); 888 return; 889 } 890 server.stat_net_output_bytes += nwritten; 891 sdsrange(slave->replpreamble,nwritten,-1); 892 if (sdslen(slave->replpreamble) == 0) { 893 sdsfree(slave->replpreamble); 894 slave->replpreamble = NULL; 895 /* fall through sending data. */ 896 } else { 897 return; 898 } 899 } 900 901 /* If the preamble was already transferred, send the RDB bulk data. */ 902 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 903 buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); 904 if (buflen <= 0) { 905 serverLog(LL_WARNING,"Read error sending DB to replica: %s", 906 (buflen == 0) ? "premature EOF" : strerror(errno)); 907 freeClient(slave); 908 return; 909 } 910 if ((nwritten = write(fd,buf,buflen)) == -1) { 911 if (errno != EAGAIN) { 912 serverLog(LL_WARNING,"Write error sending DB to replica: %s", 913 strerror(errno)); 914 freeClient(slave); 915 } 916 return; 917 } 918 slave->repldboff += nwritten; 919 server.stat_net_output_bytes += nwritten; 920 if (slave->repldboff == slave->repldbsize) { 921 close(slave->repldbfd); 922 slave->repldbfd = -1; 923 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 924 putSlaveOnline(slave); 925 } 926 } 927 928 /* This function is called at the end of every background saving, 929 * or when the replication RDB transfer strategy is modified from 930 * disk to socket or the other way around. 931 * 932 * The goal of this function is to handle slaves waiting for a successful 933 * background saving in order to perform non-blocking synchronization, and 934 * to schedule a new BGSAVE if there are slaves that attached while a 935 * BGSAVE was in progress, but it was not a good one for replication (no 936 * other slave was accumulating differences). 937 * 938 * The argument bgsaveerr is C_OK if the background saving succeeded 939 * otherwise C_ERR is passed to the function. 940 * The 'type' argument is the type of the child that terminated 941 * (if it had a disk or socket target). */ 942 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 943 listNode *ln; 944 int startbgsave = 0; 945 int mincapa = -1; 946 listIter li; 947 948 listRewind(server.slaves,&li); 949 while((ln = listNext(&li))) { 950 client *slave = ln->value; 951 952 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 953 startbgsave = 1; 954 mincapa = (mincapa == -1) ? slave->slave_capa : 955 (mincapa & slave->slave_capa); 956 } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { 957 struct redis_stat buf; 958 959 /* If this was an RDB on disk save, we have to prepare to send 960 * the RDB from disk to the slave socket. Otherwise if this was 961 * already an RDB -> Slaves socket transfer, used in the case of 962 * diskless replication, our work is trivial, we can just put 963 * the slave online. */ 964 if (type == RDB_CHILD_TYPE_SOCKET) { 965 serverLog(LL_NOTICE, 966 "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 967 replicationGetSlaveName(slave)); 968 /* Note: we wait for a REPLCONF ACK message from slave in 969 * order to really put it online (install the write handler 970 * so that the accumulated data can be transferred). However 971 * we change the replication state ASAP, since our slave 972 * is technically online now. */ 973 slave->replstate = SLAVE_STATE_ONLINE; 974 slave->repl_put_online_on_ack = 1; 975 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ 976 } else { 977 if (bgsaveerr != C_OK) { 978 freeClient(slave); 979 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); 980 continue; 981 } 982 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 983 redis_fstat(slave->repldbfd,&buf) == -1) { 984 freeClient(slave); 985 serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 986 continue; 987 } 988 slave->repldboff = 0; 989 slave->repldbsize = buf.st_size; 990 slave->replstate = SLAVE_STATE_SEND_BULK; 991 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 992 (unsigned long long) slave->repldbsize); 993 994 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 995 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 996 freeClient(slave); 997 continue; 998 } 999 } 1000 } 1001 } 1002 if (startbgsave) startBgsaveForReplication(mincapa); 1003 } 1004 1005 /* Change the current instance replication ID with a new, random one. 1006 * This will prevent successful PSYNCs between this master and other 1007 * slaves, so the command should be called when something happens that 1008 * alters the current story of the dataset. */ 1009 void changeReplicationId(void) { 1010 getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE); 1011 server.replid[CONFIG_RUN_ID_SIZE] = '\0'; 1012 } 1013 1014 /* Clear (invalidate) the secondary replication ID. This happens, for 1015 * example, after a full resynchronization, when we start a new replication 1016 * history. */ 1017 void clearReplicationId2(void) { 1018 memset(server.replid2,'0',sizeof(server.replid)); 1019 server.replid2[CONFIG_RUN_ID_SIZE] = '\0'; 1020 server.second_replid_offset = -1; 1021 } 1022 1023 /* Use the current replication ID / offset as secondary replication 1024 * ID, and change the current one in order to start a new history. 1025 * This should be used when an instance is switched from slave to master 1026 * so that it can serve PSYNC requests performed using the master 1027 * replication ID. */ 1028 void shiftReplicationId(void) { 1029 memcpy(server.replid2,server.replid,sizeof(server.replid)); 1030 /* We set the second replid offset to the master offset + 1, since 1031 * the slave will ask for the first byte it has not yet received, so 1032 * we need to add one to the offset: for example if, as a slave, we are 1033 * sure we have the same history as the master for 50 bytes, after we 1034 * are turned into a master, we can accept a PSYNC request with offset 1035 * 51, since the slave asking has the same history up to the 50th 1036 * byte, and is asking for the new bytes starting at offset 51. */ 1037 server.second_replid_offset = server.master_repl_offset+1; 1038 changeReplicationId(); 1039 serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid); 1040 } 1041 1042 /* ----------------------------------- SLAVE -------------------------------- */ 1043 1044 /* Returns 1 if the given replication state is a handshake state, 1045 * 0 otherwise. */ 1046 int slaveIsInHandshakeState(void) { 1047 return server.repl_state >= REPL_STATE_RECEIVE_PONG && 1048 server.repl_state <= REPL_STATE_RECEIVE_PSYNC; 1049 } 1050 1051 /* Avoid the master to detect the slave is timing out while loading the 1052 * RDB file in initial synchronization. We send a single newline character 1053 * that is valid protocol but is guaranteed to either be sent entirely or 1054 * not, since the byte is indivisible. 1055 * 1056 * The function is called in two contexts: while we flush the current 1057 * data with emptyDb(), and while we load the new data received as an 1058 * RDB file from the master. */ 1059 void replicationSendNewlineToMaster(void) { 1060 static time_t newline_sent; 1061 if (time(NULL) != newline_sent) { 1062 newline_sent = time(NULL); 1063 if (write(server.repl_transfer_s,"\n",1) == -1) { 1064 /* Pinging back in this stage is best-effort. */ 1065 } 1066 } 1067 } 1068 1069 /* Callback used by emptyDb() while flushing away old data to load 1070 * the new dataset received by the master. */ 1071 void replicationEmptyDbCallback(void *privdata) { 1072 UNUSED(privdata); 1073 replicationSendNewlineToMaster(); 1074 } 1075 1076 /* Once we have a link with the master and the synchroniziation was 1077 * performed, this function materializes the master client we store 1078 * at server.master, starting from the specified file descriptor. */ 1079 void replicationCreateMasterClient(int fd, int dbid) { 1080 server.master = createClient(fd); 1081 server.master->flags |= CLIENT_MASTER; 1082 server.master->authenticated = 1; 1083 server.master->reploff = server.master_initial_offset; 1084 server.master->read_reploff = server.master->reploff; 1085 memcpy(server.master->replid, server.master_replid, 1086 sizeof(server.master_replid)); 1087 /* If master offset is set to -1, this master is old and is not 1088 * PSYNC capable, so we flag it accordingly. */ 1089 if (server.master->reploff == -1) 1090 server.master->flags |= CLIENT_PRE_PSYNC; 1091 if (dbid != -1) selectDb(server.master,dbid); 1092 } 1093 1094 /* This function will try to re-enable the AOF file after the 1095 * master-replica synchronization: if it fails after multiple attempts 1096 * the replica cannot be considered reliable and exists with an 1097 * error. */ 1098 void restartAOFAfterSYNC() { 1099 unsigned int tries, max_tries = 10; 1100 for (tries = 0; tries < max_tries; ++tries) { 1101 if (startAppendOnly() == C_OK) break; 1102 serverLog(LL_WARNING, 1103 "Failed enabling the AOF after successful master synchronization! " 1104 "Trying it again in one second."); 1105 sleep(1); 1106 } 1107 if (tries == max_tries) { 1108 serverLog(LL_WARNING, 1109 "FATAL: this replica instance finished the synchronization with " 1110 "its master, but the AOF can't be turned on. Exiting now."); 1111 exit(1); 1112 } 1113 } 1114 1115 /* Asynchronously read the SYNC payload we receive from a master */ 1116 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 1117 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 1118 char buf[4096]; 1119 ssize_t nread, readlen, nwritten; 1120 off_t left; 1121 UNUSED(el); 1122 UNUSED(privdata); 1123 UNUSED(mask); 1124 1125 /* Static vars used to hold the EOF mark, and the last bytes received 1126 * form the server: when they match, we reached the end of the transfer. */ 1127 static char eofmark[CONFIG_RUN_ID_SIZE]; 1128 static char lastbytes[CONFIG_RUN_ID_SIZE]; 1129 static int usemark = 0; 1130 1131 /* If repl_transfer_size == -1 we still have to read the bulk length 1132 * from the master reply. */ 1133 if (server.repl_transfer_size == -1) { 1134 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 1135 serverLog(LL_WARNING, 1136 "I/O error reading bulk count from MASTER: %s", 1137 strerror(errno)); 1138 goto error; 1139 } 1140 1141 if (buf[0] == '-') { 1142 serverLog(LL_WARNING, 1143 "MASTER aborted replication with an error: %s", 1144 buf+1); 1145 goto error; 1146 } else if (buf[0] == '\0') { 1147 /* At this stage just a newline works as a PING in order to take 1148 * the connection live. So we refresh our last interaction 1149 * timestamp. */ 1150 server.repl_transfer_lastio = server.unixtime; 1151 return; 1152 } else if (buf[0] != '$') { 1153 serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 1154 goto error; 1155 } 1156 1157 /* There are two possible forms for the bulk payload. One is the 1158 * usual $<count> bulk format. The other is used for diskless transfers 1159 * when the master does not know beforehand the size of the file to 1160 * transfer. In the latter case, the following format is used: 1161 * 1162 * $EOF:<40 bytes delimiter> 1163 * 1164 * At the end of the file the announced delimiter is transmitted. The 1165 * delimiter is long and random enough that the probability of a 1166 * collision with the actual file content can be ignored. */ 1167 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { 1168 usemark = 1; 1169 memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); 1170 memset(lastbytes,0,CONFIG_RUN_ID_SIZE); 1171 /* Set any repl_transfer_size to avoid entering this code path 1172 * at the next call. */ 1173 server.repl_transfer_size = 0; 1174 serverLog(LL_NOTICE, 1175 "MASTER <-> REPLICA sync: receiving streamed RDB from master"); 1176 } else { 1177 usemark = 0; 1178 server.repl_transfer_size = strtol(buf+1,NULL,10); 1179 serverLog(LL_NOTICE, 1180 "MASTER <-> REPLICA sync: receiving %lld bytes from master", 1181 (long long) server.repl_transfer_size); 1182 } 1183 return; 1184 } 1185 1186 /* Read bulk data */ 1187 if (usemark) { 1188 readlen = sizeof(buf); 1189 } else { 1190 left = server.repl_transfer_size - server.repl_transfer_read; 1191 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 1192 } 1193 1194 nread = read(fd,buf,readlen); 1195 if (nread <= 0) { 1196 serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", 1197 (nread == -1) ? strerror(errno) : "connection lost"); 1198 cancelReplicationHandshake(); 1199 return; 1200 } 1201 server.stat_net_input_bytes += nread; 1202 1203 /* When a mark is used, we want to detect EOF asap in order to avoid 1204 * writing the EOF mark into the file... */ 1205 int eof_reached = 0; 1206 1207 if (usemark) { 1208 /* Update the last bytes array, and check if it matches our delimiter.*/ 1209 if (nread >= CONFIG_RUN_ID_SIZE) { 1210 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); 1211 } else { 1212 int rem = CONFIG_RUN_ID_SIZE-nread; 1213 memmove(lastbytes,lastbytes+nread,rem); 1214 memcpy(lastbytes+rem,buf,nread); 1215 } 1216 if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; 1217 } 1218 1219 server.repl_transfer_lastio = server.unixtime; 1220 if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { 1221 serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", 1222 (nwritten == -1) ? strerror(errno) : "short write"); 1223 goto error; 1224 } 1225 server.repl_transfer_read += nread; 1226 1227 /* Delete the last 40 bytes from the file if we reached EOF. */ 1228 if (usemark && eof_reached) { 1229 if (ftruncate(server.repl_transfer_fd, 1230 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) 1231 { 1232 serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 1233 goto error; 1234 } 1235 } 1236 1237 /* Sync data on disk from time to time, otherwise at the end of the transfer 1238 * we may suffer a big delay as the memory buffers are copied into the 1239 * actual disk. */ 1240 if (server.repl_transfer_read >= 1241 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 1242 { 1243 off_t sync_size = server.repl_transfer_read - 1244 server.repl_transfer_last_fsync_off; 1245 rdb_fsync_range(server.repl_transfer_fd, 1246 server.repl_transfer_last_fsync_off, sync_size); 1247 server.repl_transfer_last_fsync_off += sync_size; 1248 } 1249 1250 /* Check if the transfer is now complete */ 1251 if (!usemark) { 1252 if (server.repl_transfer_read == server.repl_transfer_size) 1253 eof_reached = 1; 1254 } 1255 1256 if (eof_reached) { 1257 int aof_is_enabled = server.aof_state != AOF_OFF; 1258 1259 /* Ensure background save doesn't overwrite synced data */ 1260 if (server.rdb_child_pid != -1) { 1261 serverLog(LL_NOTICE, 1262 "Replica is about to load the RDB file received from the " 1263 "master, but there is a pending RDB child running. " 1264 "Killing process %ld and removing its temp file to avoid " 1265 "any race", 1266 (long) server.rdb_child_pid); 1267 kill(server.rdb_child_pid,SIGUSR1); 1268 rdbRemoveTempFile(server.rdb_child_pid); 1269 } 1270 1271 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 1272 serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno)); 1273 cancelReplicationHandshake(); 1274 return; 1275 } 1276 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); 1277 /* We need to stop any AOFRW fork before flusing and parsing 1278 * RDB, otherwise we'll create a copy-on-write disaster. */ 1279 if(aof_is_enabled) stopAppendOnly(); 1280 signalFlushedDb(-1); 1281 emptyDb( 1282 -1, 1283 server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, 1284 replicationEmptyDbCallback); 1285 /* Before loading the DB into memory we need to delete the readable 1286 * handler, otherwise it will get called recursively since 1287 * rdbLoad() will call the event loop to process events from time to 1288 * time for non blocking loading. */ 1289 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1290 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); 1291 rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; 1292 if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { 1293 serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1294 cancelReplicationHandshake(); 1295 /* Re-enable the AOF if we disabled it earlier, in order to restore 1296 * the original configuration. */ 1297 if (aof_is_enabled) restartAOFAfterSYNC(); 1298 return; 1299 } 1300 /* Final setup of the connected slave <- master link */ 1301 zfree(server.repl_transfer_tmpfile); 1302 close(server.repl_transfer_fd); 1303 replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); 1304 server.repl_state = REPL_STATE_CONNECTED; 1305 server.repl_down_since = 0; 1306 /* After a full resynchroniziation we use the replication ID and 1307 * offset of the master. The secondary ID / offset are cleared since 1308 * we are starting a new history. */ 1309 memcpy(server.replid,server.master->replid,sizeof(server.replid)); 1310 server.master_repl_offset = server.master->reploff; 1311 clearReplicationId2(); 1312 /* Let's create the replication backlog if needed. Slaves need to 1313 * accumulate the backlog regardless of the fact they have sub-slaves 1314 * or not, in order to behave correctly if they are promoted to 1315 * masters after a failover. */ 1316 if (server.repl_backlog == NULL) createReplicationBacklog(); 1317 1318 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); 1319 /* Restart the AOF subsystem now that we finished the sync. This 1320 * will trigger an AOF rewrite, and when done will start appending 1321 * to the new file. */ 1322 if (aof_is_enabled) restartAOFAfterSYNC(); 1323 } 1324 return; 1325 1326 error: 1327 cancelReplicationHandshake(); 1328 return; 1329 } 1330 1331 /* Send a synchronous command to the master. Used to send AUTH and 1332 * REPLCONF commands before starting the replication with SYNC. 1333 * 1334 * The command returns an sds string representing the result of the 1335 * operation. On error the first byte is a "-". 1336 */ 1337 #define SYNC_CMD_READ (1<<0) 1338 #define SYNC_CMD_WRITE (1<<1) 1339 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE) 1340 char *sendSynchronousCommand(int flags, int fd, ...) { 1341 1342 /* Create the command to send to the master, we use redis binary 1343 * protocol to make sure correct arguments are sent. This function 1344 * is not safe for all binary data. */ 1345 if (flags & SYNC_CMD_WRITE) { 1346 char *arg; 1347 va_list ap; 1348 sds cmd = sdsempty(); 1349 sds cmdargs = sdsempty(); 1350 size_t argslen = 0; 1351 va_start(ap,fd); 1352 1353 while(1) { 1354 arg = va_arg(ap, char*); 1355 if (arg == NULL) break; 1356 1357 cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg); 1358 argslen++; 1359 } 1360 1361 va_end(ap); 1362 1363 cmd = sdscatprintf(cmd,"*%zu\r\n",argslen); 1364 cmd = sdscatsds(cmd,cmdargs); 1365 sdsfree(cmdargs); 1366 1367 /* Transfer command to the server. */ 1368 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) 1369 == -1) 1370 { 1371 sdsfree(cmd); 1372 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1373 strerror(errno)); 1374 } 1375 sdsfree(cmd); 1376 } 1377 1378 /* Read the reply from the server. */ 1379 if (flags & SYNC_CMD_READ) { 1380 char buf[256]; 1381 1382 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) 1383 == -1) 1384 { 1385 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1386 strerror(errno)); 1387 } 1388 server.repl_transfer_lastio = server.unixtime; 1389 return sdsnew(buf); 1390 } 1391 return NULL; 1392 } 1393 1394 /* Try a partial resynchronization with the master if we are about to reconnect. 1395 * If there is no cached master structure, at least try to issue a 1396 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1397 * command in order to obtain the master run id and the master replication 1398 * global offset. 1399 * 1400 * This function is designed to be called from syncWithMaster(), so the 1401 * following assumptions are made: 1402 * 1403 * 1) We pass the function an already connected socket "fd". 1404 * 2) This function does not close the file descriptor "fd". However in case 1405 * of successful partial resynchronization, the function will reuse 1406 * 'fd' as file descriptor of the server.master client structure. 1407 * 1408 * The function is split in two halves: if read_reply is 0, the function 1409 * writes the PSYNC command on the socket, and a new function call is 1410 * needed, with read_reply set to 1, in order to read the reply of the 1411 * command. This is useful in order to support non blocking operations, so 1412 * that we write, return into the event loop, and read when there are data. 1413 * 1414 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there 1415 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call 1416 * with read_reply set to 1. However even when read_reply is set to 1 1417 * the function may return PSYNC_WAIT_REPLY again to signal there were 1418 * insufficient data to read to complete its work. We should re-enter 1419 * into the event loop and wait in such a case. 1420 * 1421 * The function returns: 1422 * 1423 * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue. 1424 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1425 * In this case the master run_id and global replication 1426 * offset is saved. 1427 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1428 * the caller should fall back to SYNC. 1429 * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. 1430 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. 1431 * PSYNC_TRY_LATER: Master is currently in a transient error condition. 1432 * 1433 * Notable side effects: 1434 * 1435 * 1) As a side effect of the function call the function removes the readable 1436 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. 1437 * 2) server.master_initial_offset is set to the right value according 1438 * to the master reply. This will be used to populate the 'server.master' 1439 * structure replication offset. 1440 */ 1441 1442 #define PSYNC_WRITE_ERROR 0 1443 #define PSYNC_WAIT_REPLY 1 1444 #define PSYNC_CONTINUE 2 1445 #define PSYNC_FULLRESYNC 3 1446 #define PSYNC_NOT_SUPPORTED 4 1447 #define PSYNC_TRY_LATER 5 1448 int slaveTryPartialResynchronization(int fd, int read_reply) { 1449 char *psync_replid; 1450 char psync_offset[32]; 1451 sds reply; 1452 1453 /* Writing half */ 1454 if (!read_reply) { 1455 /* Initially set master_initial_offset to -1 to mark the current 1456 * master run_id and offset as not valid. Later if we'll be able to do 1457 * a FULL resync using the PSYNC command we'll set the offset at the 1458 * right value, so that this information will be propagated to the 1459 * client structure representing the master into server.master. */ 1460 server.master_initial_offset = -1; 1461 1462 if (server.cached_master) { 1463 psync_replid = server.cached_master->replid; 1464 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1465 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); 1466 } else { 1467 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); 1468 psync_replid = "?"; 1469 memcpy(psync_offset,"-1",3); 1470 } 1471 1472 /* Issue the PSYNC command */ 1473 reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL); 1474 if (reply != NULL) { 1475 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); 1476 sdsfree(reply); 1477 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1478 return PSYNC_WRITE_ERROR; 1479 } 1480 return PSYNC_WAIT_REPLY; 1481 } 1482 1483 /* Reading half */ 1484 reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1485 if (sdslen(reply) == 0) { 1486 /* The master may send empty newlines after it receives PSYNC 1487 * and before to reply, just to keep the connection alive. */ 1488 sdsfree(reply); 1489 return PSYNC_WAIT_REPLY; 1490 } 1491 1492 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1493 1494 if (!strncmp(reply,"+FULLRESYNC",11)) { 1495 char *replid = NULL, *offset = NULL; 1496 1497 /* FULL RESYNC, parse the reply in order to extract the run id 1498 * and the replication offset. */ 1499 replid = strchr(reply,' '); 1500 if (replid) { 1501 replid++; 1502 offset = strchr(replid,' '); 1503 if (offset) offset++; 1504 } 1505 if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { 1506 serverLog(LL_WARNING, 1507 "Master replied with wrong +FULLRESYNC syntax."); 1508 /* This is an unexpected condition, actually the +FULLRESYNC 1509 * reply means that the master supports PSYNC, but the reply 1510 * format seems wrong. To stay safe we blank the master 1511 * replid to make sure next PSYNCs will fail. */ 1512 memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); 1513 } else { 1514 memcpy(server.master_replid, replid, offset-replid-1); 1515 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; 1516 server.master_initial_offset = strtoll(offset,NULL,10); 1517 serverLog(LL_NOTICE,"Full resync from master: %s:%lld", 1518 server.master_replid, 1519 server.master_initial_offset); 1520 } 1521 /* We are going to full resync, discard the cached master structure. */ 1522 replicationDiscardCachedMaster(); 1523 sdsfree(reply); 1524 return PSYNC_FULLRESYNC; 1525 } 1526 1527 if (!strncmp(reply,"+CONTINUE",9)) { 1528 /* Partial resync was accepted. */ 1529 serverLog(LL_NOTICE, 1530 "Successful partial resynchronization with master."); 1531 1532 /* Check the new replication ID advertised by the master. If it 1533 * changed, we need to set the new ID as primary ID, and set or 1534 * secondary ID as the old master ID up to the current offset, so 1535 * that our sub-slaves will be able to PSYNC with us after a 1536 * disconnection. */ 1537 char *start = reply+10; 1538 char *end = reply+9; 1539 while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; 1540 if (end-start == CONFIG_RUN_ID_SIZE) { 1541 char new[CONFIG_RUN_ID_SIZE+1]; 1542 memcpy(new,start,CONFIG_RUN_ID_SIZE); 1543 new[CONFIG_RUN_ID_SIZE] = '\0'; 1544 1545 if (strcmp(new,server.cached_master->replid)) { 1546 /* Master ID changed. */ 1547 serverLog(LL_WARNING,"Master replication ID changed to %s",new); 1548 1549 /* Set the old ID as our ID2, up to the current offset+1. */ 1550 memcpy(server.replid2,server.cached_master->replid, 1551 sizeof(server.replid2)); 1552 server.second_replid_offset = server.master_repl_offset+1; 1553 1554 /* Update the cached master ID and our own primary ID to the 1555 * new one. */ 1556 memcpy(server.replid,new,sizeof(server.replid)); 1557 memcpy(server.cached_master->replid,new,sizeof(server.replid)); 1558 1559 /* Disconnect all the sub-slaves: they need to be notified. */ 1560 disconnectSlaves(); 1561 } 1562 } 1563 1564 /* Setup the replication to continue. */ 1565 sdsfree(reply); 1566 replicationResurrectCachedMaster(fd); 1567 1568 /* If this instance was restarted and we read the metadata to 1569 * PSYNC from the persistence file, our replication backlog could 1570 * be still not initialized. Create it. */ 1571 if (server.repl_backlog == NULL) createReplicationBacklog(); 1572 return PSYNC_CONTINUE; 1573 } 1574 1575 /* If we reach this point we received either an error (since the master does 1576 * not understand PSYNC or because it is in a special state and cannot 1577 * serve our request), or an unexpected reply from the master. 1578 * 1579 * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise 1580 * return PSYNC_TRY_LATER if we believe this is a transient error. */ 1581 1582 if (!strncmp(reply,"-NOMASTERLINK",13) || 1583 !strncmp(reply,"-LOADING",8)) 1584 { 1585 serverLog(LL_NOTICE, 1586 "Master is currently unable to PSYNC " 1587 "but should be in the future: %s", reply); 1588 sdsfree(reply); 1589 return PSYNC_TRY_LATER; 1590 } 1591 1592 if (strncmp(reply,"-ERR",4)) { 1593 /* If it's not an error, log the unexpected event. */ 1594 serverLog(LL_WARNING, 1595 "Unexpected reply to PSYNC from master: %s", reply); 1596 } else { 1597 serverLog(LL_NOTICE, 1598 "Master does not support PSYNC or is in " 1599 "error state (reply: %s)", reply); 1600 } 1601 sdsfree(reply); 1602 replicationDiscardCachedMaster(); 1603 return PSYNC_NOT_SUPPORTED; 1604 } 1605 1606 /* This handler fires when the non blocking connect was able to 1607 * establish a connection with the master. */ 1608 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1609 char tmpfile[256], *err = NULL; 1610 int dfd = -1, maxtries = 5; 1611 int sockerr = 0, psync_result; 1612 socklen_t errlen = sizeof(sockerr); 1613 UNUSED(el); 1614 UNUSED(privdata); 1615 UNUSED(mask); 1616 1617 /* If this event fired after the user turned the instance into a master 1618 * with SLAVEOF NO ONE we must just return ASAP. */ 1619 if (server.repl_state == REPL_STATE_NONE) { 1620 close(fd); 1621 return; 1622 } 1623 1624 /* Check for errors in the socket: after a non blocking connect() we 1625 * may find that the socket is in error state. */ 1626 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1627 sockerr = errno; 1628 if (sockerr) { 1629 serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", 1630 strerror(sockerr)); 1631 goto error; 1632 } 1633 1634 /* Send a PING to check the master is able to reply without errors. */ 1635 if (server.repl_state == REPL_STATE_CONNECTING) { 1636 serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); 1637 /* Delete the writable event so that the readable event remains 1638 * registered and we can wait for the PONG reply. */ 1639 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1640 server.repl_state = REPL_STATE_RECEIVE_PONG; 1641 /* Send the PING, don't check for errors at all, we have the timeout 1642 * that will take care about this. */ 1643 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); 1644 if (err) goto write_error; 1645 return; 1646 } 1647 1648 /* Receive the PONG command. */ 1649 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { 1650 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1651 1652 /* We accept only two replies as valid, a positive +PONG reply 1653 * (we just check for "+") or an authentication error. 1654 * Note that older versions of Redis replied with "operation not 1655 * permitted" instead of using a proper error code, so we test 1656 * both. */ 1657 if (err[0] != '+' && 1658 strncmp(err,"-NOAUTH",7) != 0 && 1659 strncmp(err,"-ERR operation not permitted",28) != 0) 1660 { 1661 serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); 1662 sdsfree(err); 1663 goto error; 1664 } else { 1665 serverLog(LL_NOTICE, 1666 "Master replied to PING, replication can continue..."); 1667 } 1668 sdsfree(err); 1669 server.repl_state = REPL_STATE_SEND_AUTH; 1670 } 1671 1672 /* AUTH with the master if required. */ 1673 if (server.repl_state == REPL_STATE_SEND_AUTH) { 1674 if (server.masterauth) { 1675 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); 1676 if (err) goto write_error; 1677 server.repl_state = REPL_STATE_RECEIVE_AUTH; 1678 return; 1679 } else { 1680 server.repl_state = REPL_STATE_SEND_PORT; 1681 } 1682 } 1683 1684 /* Receive AUTH reply. */ 1685 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { 1686 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1687 if (err[0] == '-') { 1688 serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); 1689 sdsfree(err); 1690 goto error; 1691 } 1692 sdsfree(err); 1693 server.repl_state = REPL_STATE_SEND_PORT; 1694 } 1695 1696 /* Set the slave port, so that Master's INFO command can list the 1697 * slave listening port correctly. */ 1698 if (server.repl_state == REPL_STATE_SEND_PORT) { 1699 sds port = sdsfromlonglong(server.slave_announce_port ? 1700 server.slave_announce_port : server.port); 1701 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1702 "listening-port",port, NULL); 1703 sdsfree(port); 1704 if (err) goto write_error; 1705 sdsfree(err); 1706 server.repl_state = REPL_STATE_RECEIVE_PORT; 1707 return; 1708 } 1709 1710 /* Receive REPLCONF listening-port reply. */ 1711 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { 1712 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1713 /* Ignore the error if any, not all the Redis versions support 1714 * REPLCONF listening-port. */ 1715 if (err[0] == '-') { 1716 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1717 "REPLCONF listening-port: %s", err); 1718 } 1719 sdsfree(err); 1720 server.repl_state = REPL_STATE_SEND_IP; 1721 } 1722 1723 /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ 1724 if (server.repl_state == REPL_STATE_SEND_IP && 1725 server.slave_announce_ip == NULL) 1726 { 1727 server.repl_state = REPL_STATE_SEND_CAPA; 1728 } 1729 1730 /* Set the slave ip, so that Master's INFO command can list the 1731 * slave IP address port correctly in case of port forwarding or NAT. */ 1732 if (server.repl_state == REPL_STATE_SEND_IP) { 1733 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1734 "ip-address",server.slave_announce_ip, NULL); 1735 if (err) goto write_error; 1736 sdsfree(err); 1737 server.repl_state = REPL_STATE_RECEIVE_IP; 1738 return; 1739 } 1740 1741 /* Receive REPLCONF ip-address reply. */ 1742 if (server.repl_state == REPL_STATE_RECEIVE_IP) { 1743 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1744 /* Ignore the error if any, not all the Redis versions support 1745 * REPLCONF listening-port. */ 1746 if (err[0] == '-') { 1747 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1748 "REPLCONF ip-address: %s", err); 1749 } 1750 sdsfree(err); 1751 server.repl_state = REPL_STATE_SEND_CAPA; 1752 } 1753 1754 /* Inform the master of our (slave) capabilities. 1755 * 1756 * EOF: supports EOF-style RDB transfer for diskless replication. 1757 * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. 1758 * 1759 * The master will ignore capabilities it does not understand. */ 1760 if (server.repl_state == REPL_STATE_SEND_CAPA) { 1761 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", 1762 "capa","eof","capa","psync2",NULL); 1763 if (err) goto write_error; 1764 sdsfree(err); 1765 server.repl_state = REPL_STATE_RECEIVE_CAPA; 1766 return; 1767 } 1768 1769 /* Receive CAPA reply. */ 1770 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { 1771 err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); 1772 /* Ignore the error if any, not all the Redis versions support 1773 * REPLCONF capa. */ 1774 if (err[0] == '-') { 1775 serverLog(LL_NOTICE,"(Non critical) Master does not understand " 1776 "REPLCONF capa: %s", err); 1777 } 1778 sdsfree(err); 1779 server.repl_state = REPL_STATE_SEND_PSYNC; 1780 } 1781 1782 /* Try a partial resynchonization. If we don't have a cached master 1783 * slaveTryPartialResynchronization() will at least try to use PSYNC 1784 * to start a full resynchronization so that we get the master run id 1785 * and the global offset, to try a partial resync at the next 1786 * reconnection attempt. */ 1787 if (server.repl_state == REPL_STATE_SEND_PSYNC) { 1788 if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { 1789 err = sdsnew("Write error sending the PSYNC command."); 1790 goto write_error; 1791 } 1792 server.repl_state = REPL_STATE_RECEIVE_PSYNC; 1793 return; 1794 } 1795 1796 /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ 1797 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { 1798 serverLog(LL_WARNING,"syncWithMaster(): state machine error, " 1799 "state should be RECEIVE_PSYNC but is %d", 1800 server.repl_state); 1801 goto error; 1802 } 1803 1804 psync_result = slaveTryPartialResynchronization(fd,1); 1805 if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ 1806 1807 /* If the master is in an transient error, we should try to PSYNC 1808 * from scratch later, so go to the error path. This happens when 1809 * the server is loading the dataset or is not connected with its 1810 * master and so forth. */ 1811 if (psync_result == PSYNC_TRY_LATER) goto error; 1812 1813 /* Note: if PSYNC does not return WAIT_REPLY, it will take care of 1814 * uninstalling the read handler from the file descriptor. */ 1815 1816 if (psync_result == PSYNC_CONTINUE) { 1817 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); 1818 return; 1819 } 1820 1821 /* PSYNC failed or is not supported: we want our slaves to resync with us 1822 * as well, if we have any sub-slaves. The master may transfer us an 1823 * entirely different data set and we have no way to incrementally feed 1824 * our slaves after that. */ 1825 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1826 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1827 1828 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1829 * and the server.master_replid and master_initial_offset are 1830 * already populated. */ 1831 if (psync_result == PSYNC_NOT_SUPPORTED) { 1832 serverLog(LL_NOTICE,"Retrying with SYNC..."); 1833 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1834 serverLog(LL_WARNING,"I/O error writing to MASTER: %s", 1835 strerror(errno)); 1836 goto error; 1837 } 1838 } 1839 1840 /* Prepare a suitable temp file for bulk transfer */ 1841 while(maxtries--) { 1842 snprintf(tmpfile,256, 1843 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1844 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1845 if (dfd != -1) break; 1846 sleep(1); 1847 } 1848 if (dfd == -1) { 1849 serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); 1850 goto error; 1851 } 1852 1853 /* Setup the non blocking download of the bulk file. */ 1854 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1855 == AE_ERR) 1856 { 1857 serverLog(LL_WARNING, 1858 "Can't create readable event for SYNC: %s (fd=%d)", 1859 strerror(errno),fd); 1860 goto error; 1861 } 1862 1863 server.repl_state = REPL_STATE_TRANSFER; 1864 server.repl_transfer_size = -1; 1865 server.repl_transfer_read = 0; 1866 server.repl_transfer_last_fsync_off = 0; 1867 server.repl_transfer_fd = dfd; 1868 server.repl_transfer_lastio = server.unixtime; 1869 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1870 return; 1871 1872 error: 1873 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1874 if (dfd != -1) close(dfd); 1875 close(fd); 1876 server.repl_transfer_s = -1; 1877 server.repl_state = REPL_STATE_CONNECT; 1878 return; 1879 1880 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ 1881 serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); 1882 sdsfree(err); 1883 goto error; 1884 } 1885 1886 int connectWithMaster(void) { 1887 int fd; 1888 1889 fd = anetTcpNonBlockBestEffortBindConnect(NULL, 1890 server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); 1891 if (fd == -1) { 1892 serverLog(LL_WARNING,"Unable to connect to MASTER: %s", 1893 strerror(errno)); 1894 return C_ERR; 1895 } 1896 1897 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1898 AE_ERR) 1899 { 1900 close(fd); 1901 serverLog(LL_WARNING,"Can't create readable event for SYNC"); 1902 return C_ERR; 1903 } 1904 1905 server.repl_transfer_lastio = server.unixtime; 1906 server.repl_transfer_s = fd; 1907 server.repl_state = REPL_STATE_CONNECTING; 1908 return C_OK; 1909 } 1910 1911 /* This function can be called when a non blocking connection is currently 1912 * in progress to undo it. 1913 * Never call this function directly, use cancelReplicationHandshake() instead. 1914 */ 1915 void undoConnectWithMaster(void) { 1916 int fd = server.repl_transfer_s; 1917 1918 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1919 close(fd); 1920 server.repl_transfer_s = -1; 1921 } 1922 1923 /* Abort the async download of the bulk dataset while SYNC-ing with master. 1924 * Never call this function directly, use cancelReplicationHandshake() instead. 1925 */ 1926 void replicationAbortSyncTransfer(void) { 1927 serverAssert(server.repl_state == REPL_STATE_TRANSFER); 1928 undoConnectWithMaster(); 1929 close(server.repl_transfer_fd); 1930 unlink(server.repl_transfer_tmpfile); 1931 zfree(server.repl_transfer_tmpfile); 1932 } 1933 1934 /* This function aborts a non blocking replication attempt if there is one 1935 * in progress, by canceling the non-blocking connect attempt or 1936 * the initial bulk transfer. 1937 * 1938 * If there was a replication handshake in progress 1 is returned and 1939 * the replication state (server.repl_state) set to REPL_STATE_CONNECT. 1940 * 1941 * Otherwise zero is returned and no operation is perforemd at all. */ 1942 int cancelReplicationHandshake(void) { 1943 if (server.repl_state == REPL_STATE_TRANSFER) { 1944 replicationAbortSyncTransfer(); 1945 server.repl_state = REPL_STATE_CONNECT; 1946 } else if (server.repl_state == REPL_STATE_CONNECTING || 1947 slaveIsInHandshakeState()) 1948 { 1949 undoConnectWithMaster(); 1950 server.repl_state = REPL_STATE_CONNECT; 1951 } else { 1952 return 0; 1953 } 1954 return 1; 1955 } 1956 1957 /* Set replication to the specified master address and port. */ 1958 void replicationSetMaster(char *ip, int port) { 1959 int was_master = server.masterhost == NULL; 1960 1961 sdsfree(server.masterhost); 1962 server.masterhost = sdsnew(ip); 1963 server.masterport = port; 1964 if (server.master) { 1965 freeClient(server.master); 1966 } 1967 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ 1968 1969 /* Force our slaves to resync with us as well. They may hopefully be able 1970 * to partially resync with us, but we can notify the replid change. */ 1971 disconnectSlaves(); 1972 cancelReplicationHandshake(); 1973 /* Before destroying our master state, create a cached master using 1974 * our own parameters, to later PSYNC with the new master. */ 1975 if (was_master) replicationCacheMasterUsingMyself(); 1976 server.repl_state = REPL_STATE_CONNECT; 1977 } 1978 1979 /* Cancel replication, setting the instance as a master itself. */ 1980 void replicationUnsetMaster(void) { 1981 if (server.masterhost == NULL) return; /* Nothing to do. */ 1982 sdsfree(server.masterhost); 1983 server.masterhost = NULL; 1984 /* When a slave is turned into a master, the current replication ID 1985 * (that was inherited from the master at synchronization time) is 1986 * used as secondary ID up to the current offset, and a new replication 1987 * ID is created to continue with a new replication history. */ 1988 shiftReplicationId(); 1989 if (server.master) freeClient(server.master); 1990 replicationDiscardCachedMaster(); 1991 cancelReplicationHandshake(); 1992 /* Disconnecting all the slaves is required: we need to inform slaves 1993 * of the replication ID change (see shiftReplicationId() call). However 1994 * the slaves will be able to partially resync with us, so it will be 1995 * a very fast reconnection. */ 1996 disconnectSlaves(); 1997 server.repl_state = REPL_STATE_NONE; 1998 1999 /* We need to make sure the new master will start the replication stream 2000 * with a SELECT statement. This is forced after a full resync, but 2001 * with PSYNC version 2, there is no need for full resync after a 2002 * master switch. */ 2003 server.slaveseldb = -1; 2004 2005 /* Once we turn from slave to master, we consider the starting time without 2006 * slaves (that is used to count the replication backlog time to live) as 2007 * starting from now. Otherwise the backlog will be freed after a 2008 * failover if slaves do not connect immediately. */ 2009 server.repl_no_slaves_since = server.unixtime; 2010 } 2011 2012 /* This function is called when the slave lose the connection with the 2013 * master into an unexpected way. */ 2014 void replicationHandleMasterDisconnection(void) { 2015 server.master = NULL; 2016 server.repl_state = REPL_STATE_CONNECT; 2017 server.repl_down_since = server.unixtime; 2018 /* We lost connection with our master, don't disconnect slaves yet, 2019 * maybe we'll be able to PSYNC with our master later. We'll disconnect 2020 * the slaves only if we'll have to do a full resync with our master. */ 2021 } 2022 2023 void replicaofCommand(client *c) { 2024 /* SLAVEOF is not allowed in cluster mode as replication is automatically 2025 * configured using the current address of the master node. */ 2026 if (server.cluster_enabled) { 2027 addReplyError(c,"REPLICAOF not allowed in cluster mode."); 2028 return; 2029 } 2030 2031 /* The special host/port combination "NO" "ONE" turns the instance 2032 * into a master. Otherwise the new master address is set. */ 2033 if (!strcasecmp(c->argv[1]->ptr,"no") && 2034 !strcasecmp(c->argv[2]->ptr,"one")) { 2035 if (server.masterhost) { 2036 replicationUnsetMaster(); 2037 sds client = catClientInfoString(sdsempty(),c); 2038 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", 2039 client); 2040 sdsfree(client); 2041 } 2042 } else { 2043 long port; 2044 2045 if (c->flags & CLIENT_SLAVE) 2046 { 2047 /* If a client is already a replica they cannot run this command, 2048 * because it involves flushing all replicas (including this 2049 * client) */ 2050 addReplyError(c, "Command is not valid when client is a replica."); 2051 return; 2052 } 2053 2054 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) 2055 return; 2056 2057 /* Check if we are already attached to the specified slave */ 2058 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 2059 && server.masterport == port) { 2060 serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed."); 2061 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 2062 return; 2063 } 2064 /* There was no previous master or the user specified a different one, 2065 * we can continue. */ 2066 replicationSetMaster(c->argv[1]->ptr, port); 2067 sds client = catClientInfoString(sdsempty(),c); 2068 serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", 2069 server.masterhost, server.masterport, client); 2070 sdsfree(client); 2071 } 2072 addReply(c,shared.ok); 2073 } 2074 2075 /* ROLE command: provide information about the role of the instance 2076 * (master or slave) and additional information related to replication 2077 * in an easy to process format. */ 2078 void roleCommand(client *c) { 2079 if (server.masterhost == NULL) { 2080 listIter li; 2081 listNode *ln; 2082 void *mbcount; 2083 int slaves = 0; 2084 2085 addReplyMultiBulkLen(c,3); 2086 addReplyBulkCBuffer(c,"master",6); 2087 addReplyLongLong(c,server.master_repl_offset); 2088 mbcount = addDeferredMultiBulkLength(c); 2089 listRewind(server.slaves,&li); 2090 while((ln = listNext(&li))) { 2091 client *slave = ln->value; 2092 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip; 2093 2094 if (slaveip[0] == '\0') { 2095 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) 2096 continue; 2097 slaveip = ip; 2098 } 2099 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2100 addReplyMultiBulkLen(c,3); 2101 addReplyBulkCString(c,slaveip); 2102 addReplyBulkLongLong(c,slave->slave_listening_port); 2103 addReplyBulkLongLong(c,slave->repl_ack_off); 2104 slaves++; 2105 } 2106 setDeferredMultiBulkLength(c,mbcount,slaves); 2107 } else { 2108 char *slavestate = NULL; 2109 2110 addReplyMultiBulkLen(c,5); 2111 addReplyBulkCBuffer(c,"slave",5); 2112 addReplyBulkCString(c,server.masterhost); 2113 addReplyLongLong(c,server.masterport); 2114 if (slaveIsInHandshakeState()) { 2115 slavestate = "handshake"; 2116 } else { 2117 switch(server.repl_state) { 2118 case REPL_STATE_NONE: slavestate = "none"; break; 2119 case REPL_STATE_CONNECT: slavestate = "connect"; break; 2120 case REPL_STATE_CONNECTING: slavestate = "connecting"; break; 2121 case REPL_STATE_TRANSFER: slavestate = "sync"; break; 2122 case REPL_STATE_CONNECTED: slavestate = "connected"; break; 2123 default: slavestate = "unknown"; break; 2124 } 2125 } 2126 addReplyBulkCString(c,slavestate); 2127 addReplyLongLong(c,server.master ? server.master->reploff : -1); 2128 } 2129 } 2130 2131 /* Send a REPLCONF ACK command to the master to inform it about the current 2132 * processed offset. If we are not connected with a master, the command has 2133 * no effects. */ 2134 void replicationSendAck(void) { 2135 client *c = server.master; 2136 2137 if (c != NULL) { 2138 c->flags |= CLIENT_MASTER_FORCE_REPLY; 2139 addReplyMultiBulkLen(c,3); 2140 addReplyBulkCString(c,"REPLCONF"); 2141 addReplyBulkCString(c,"ACK"); 2142 addReplyBulkLongLong(c,c->reploff); 2143 c->flags &= ~CLIENT_MASTER_FORCE_REPLY; 2144 } 2145 } 2146 2147 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 2148 2149 /* In order to implement partial synchronization we need to be able to cache 2150 * our master's client structure after a transient disconnection. 2151 * It is cached into server.cached_master and flushed away using the following 2152 * functions. */ 2153 2154 /* This function is called by freeClient() in order to cache the master 2155 * client structure instead of destroying it. freeClient() will return 2156 * ASAP after this function returns, so every action needed to avoid problems 2157 * with a client that is really "suspended" has to be done by this function. 2158 * 2159 * The other functions that will deal with the cached master are: 2160 * 2161 * replicationDiscardCachedMaster() that will make sure to kill the client 2162 * as for some reason we don't want to use it in the future. 2163 * 2164 * replicationResurrectCachedMaster() that is used after a successful PSYNC 2165 * handshake in order to reactivate the cached master. 2166 */ 2167 void replicationCacheMaster(client *c) { 2168 serverAssert(server.master != NULL && server.cached_master == NULL); 2169 serverLog(LL_NOTICE,"Caching the disconnected master state."); 2170 2171 /* Unlink the client from the server structures. */ 2172 unlinkClient(c); 2173 2174 /* Reset the master client so that's ready to accept new commands: 2175 * we want to discard te non processed query buffers and non processed 2176 * offsets, including pending transactions, already populated arguments, 2177 * pending outputs to the master. */ 2178 sdsclear(server.master->querybuf); 2179 sdsclear(server.master->pending_querybuf); 2180 server.master->read_reploff = server.master->reploff; 2181 if (c->flags & CLIENT_MULTI) discardTransaction(c); 2182 listEmpty(c->reply); 2183 c->sentlen = 0; 2184 c->reply_bytes = 0; 2185 c->bufpos = 0; 2186 resetClient(c); 2187 2188 /* Save the master. Server.master will be set to null later by 2189 * replicationHandleMasterDisconnection(). */ 2190 server.cached_master = server.master; 2191 2192 /* Invalidate the Peer ID cache. */ 2193 if (c->peerid) { 2194 sdsfree(c->peerid); 2195 c->peerid = NULL; 2196 } 2197 2198 /* Caching the master happens instead of the actual freeClient() call, 2199 * so make sure to adjust the replication state. This function will 2200 * also set server.master to NULL. */ 2201 replicationHandleMasterDisconnection(); 2202 } 2203 2204 /* This function is called when a master is turend into a slave, in order to 2205 * create from scratch a cached master for the new client, that will allow 2206 * to PSYNC with the slave that was promoted as the new master after a 2207 * failover. 2208 * 2209 * Assuming this instance was previously the master instance of the new master, 2210 * the new master will accept its replication ID, and potentiall also the 2211 * current offset if no data was lost during the failover. So we use our 2212 * current replication ID and offset in order to synthesize a cached master. */ 2213 void replicationCacheMasterUsingMyself(void) { 2214 /* The master client we create can be set to any DBID, because 2215 * the new master will start its replication stream with SELECT. */ 2216 server.master_initial_offset = server.master_repl_offset; 2217 replicationCreateMasterClient(-1,-1); 2218 2219 /* Use our own ID / offset. */ 2220 memcpy(server.master->replid, server.replid, sizeof(server.replid)); 2221 2222 /* Set as cached master. */ 2223 unlinkClient(server.master); 2224 server.cached_master = server.master; 2225 server.master = NULL; 2226 serverLog(LL_NOTICE,"Before turning into a replica, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); 2227 } 2228 2229 /* Free a cached master, called when there are no longer the conditions for 2230 * a partial resync on reconnection. */ 2231 void replicationDiscardCachedMaster(void) { 2232 if (server.cached_master == NULL) return; 2233 2234 serverLog(LL_NOTICE,"Discarding previously cached master state."); 2235 server.cached_master->flags &= ~CLIENT_MASTER; 2236 freeClient(server.cached_master); 2237 server.cached_master = NULL; 2238 } 2239 2240 /* Turn the cached master into the current master, using the file descriptor 2241 * passed as argument as the socket for the new master. 2242 * 2243 * This function is called when successfully setup a partial resynchronization 2244 * so the stream of data that we'll receive will start from were this 2245 * master left. */ 2246 void replicationResurrectCachedMaster(int newfd) { 2247 server.master = server.cached_master; 2248 server.cached_master = NULL; 2249 server.master->fd = newfd; 2250 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); 2251 server.master->authenticated = 1; 2252 server.master->lastinteraction = server.unixtime; 2253 server.repl_state = REPL_STATE_CONNECTED; 2254 server.repl_down_since = 0; 2255 2256 /* Re-add to the list of clients. */ 2257 linkClient(server.master); 2258 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 2259 readQueryFromClient, server.master)) { 2260 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 2261 freeClientAsync(server.master); /* Close ASAP. */ 2262 } 2263 2264 /* We may also need to install the write handler as well if there is 2265 * pending data in the write buffers. */ 2266 if (clientHasPendingReplies(server.master)) { 2267 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 2268 sendReplyToClient, server.master)) { 2269 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 2270 freeClientAsync(server.master); /* Close ASAP. */ 2271 } 2272 } 2273 } 2274 2275 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 2276 2277 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 2278 * If the option is active, the server will prevent writes if there are not 2279 * enough connected slaves with the specified lag (or less). */ 2280 void refreshGoodSlavesCount(void) { 2281 listIter li; 2282 listNode *ln; 2283 int good = 0; 2284 2285 if (!server.repl_min_slaves_to_write || 2286 !server.repl_min_slaves_max_lag) return; 2287 2288 listRewind(server.slaves,&li); 2289 while((ln = listNext(&li))) { 2290 client *slave = ln->value; 2291 time_t lag = server.unixtime - slave->repl_ack_time; 2292 2293 if (slave->replstate == SLAVE_STATE_ONLINE && 2294 lag <= server.repl_min_slaves_max_lag) good++; 2295 } 2296 server.repl_good_slaves_count = good; 2297 } 2298 2299 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 2300 * The goal of this code is to keep track of scripts already sent to every 2301 * connected slave, in order to be able to replicate EVALSHA as it is without 2302 * translating it to EVAL every time it is possible. 2303 * 2304 * We use a capped collection implemented by a hash table for fast lookup 2305 * of scripts we can send as EVALSHA, plus a linked list that is used for 2306 * eviction of the oldest entry when the max number of items is reached. 2307 * 2308 * We don't care about taking a different cache for every different slave 2309 * since to fill the cache again is not very costly, the goal of this code 2310 * is to avoid that the same big script is trasmitted a big number of times 2311 * per second wasting bandwidth and processor speed, but it is not a problem 2312 * if we need to rebuild the cache from scratch from time to time, every used 2313 * script will need to be transmitted a single time to reappear in the cache. 2314 * 2315 * This is how the system works: 2316 * 2317 * 1) Every time a new slave connects, we flush the whole script cache. 2318 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 2319 * trying to convert EVAL into EVALSHA specifically for slaves. 2320 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 2321 * corresponding SHA1 of the script into the cache as we are sure every 2322 * slave knows about the script starting from now. 2323 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 2324 * and at the same time flush the script cache. 2325 * 5) When the last slave disconnects, flush the cache. 2326 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 2327 * in the master sometimes. 2328 */ 2329 2330 /* Initialize the script cache, only called at startup. */ 2331 void replicationScriptCacheInit(void) { 2332 server.repl_scriptcache_size = 10000; 2333 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 2334 server.repl_scriptcache_fifo = listCreate(); 2335 } 2336 2337 /* Empty the script cache. Should be called every time we are no longer sure 2338 * that every slave knows about all the scripts in our set, or when the 2339 * current AOF "context" is no longer aware of the script. In general we 2340 * should flush the cache: 2341 * 2342 * 1) Every time a new slave reconnects to this master and performs a 2343 * full SYNC (PSYNC does not require flushing). 2344 * 2) Every time an AOF rewrite is performed. 2345 * 3) Every time we are left without slaves at all, and AOF is off, in order 2346 * to reclaim otherwise unused memory. 2347 */ 2348 void replicationScriptCacheFlush(void) { 2349 dictEmpty(server.repl_scriptcache_dict,NULL); 2350 listRelease(server.repl_scriptcache_fifo); 2351 server.repl_scriptcache_fifo = listCreate(); 2352 } 2353 2354 /* Add an entry into the script cache, if we reach max number of entries the 2355 * oldest is removed from the list. */ 2356 void replicationScriptCacheAdd(sds sha1) { 2357 int retval; 2358 sds key = sdsdup(sha1); 2359 2360 /* Evict oldest. */ 2361 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 2362 { 2363 listNode *ln = listLast(server.repl_scriptcache_fifo); 2364 sds oldest = listNodeValue(ln); 2365 2366 retval = dictDelete(server.repl_scriptcache_dict,oldest); 2367 serverAssert(retval == DICT_OK); 2368 listDelNode(server.repl_scriptcache_fifo,ln); 2369 } 2370 2371 /* Add current. */ 2372 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 2373 listAddNodeHead(server.repl_scriptcache_fifo,key); 2374 serverAssert(retval == DICT_OK); 2375 } 2376 2377 /* Returns non-zero if the specified entry exists inside the cache, that is, 2378 * if all the slaves are aware of this script SHA1. */ 2379 int replicationScriptCacheExists(sds sha1) { 2380 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 2381 } 2382 2383 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 2384 * Redis synchronous replication design can be summarized in points: 2385 * 2386 * - Redis masters have a global replication offset, used by PSYNC. 2387 * - Master increment the offset every time new commands are sent to slaves. 2388 * - Slaves ping back masters with the offset processed so far. 2389 * 2390 * So synchronous replication adds a new WAIT command in the form: 2391 * 2392 * WAIT <num_replicas> <milliseconds_timeout> 2393 * 2394 * That returns the number of replicas that processed the query when 2395 * we finally have at least num_replicas, or when the timeout was 2396 * reached. 2397 * 2398 * The command is implemented in this way: 2399 * 2400 * - Every time a client processes a command, we remember the replication 2401 * offset after sending that command to the slaves. 2402 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 2403 * The client is blocked at the same time (see blocked.c). 2404 * - Once we receive enough ACKs for a given offset or when the timeout 2405 * is reached, the WAIT command is unblocked and the reply sent to the 2406 * client. 2407 */ 2408 2409 /* This just set a flag so that we broadcast a REPLCONF GETACK command 2410 * to all the slaves in the beforeSleep() function. Note that this way 2411 * we "group" all the clients that want to wait for synchronouns replication 2412 * in a given event loop iteration, and send a single GETACK for them all. */ 2413 void replicationRequestAckFromSlaves(void) { 2414 server.get_ack_from_slaves = 1; 2415 } 2416 2417 /* Return the number of slaves that already acknowledged the specified 2418 * replication offset. */ 2419 int replicationCountAcksByOffset(long long offset) { 2420 listIter li; 2421 listNode *ln; 2422 int count = 0; 2423 2424 listRewind(server.slaves,&li); 2425 while((ln = listNext(&li))) { 2426 client *slave = ln->value; 2427 2428 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2429 if (slave->repl_ack_off >= offset) count++; 2430 } 2431 return count; 2432 } 2433 2434 /* WAIT for N replicas to acknowledge the processing of our latest 2435 * write command (and all the previous commands). */ 2436 void waitCommand(client *c) { 2437 mstime_t timeout; 2438 long numreplicas, ackreplicas; 2439 long long offset = c->woff; 2440 2441 if (server.masterhost) { 2442 addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated."); 2443 return; 2444 } 2445 2446 /* Argument parsing. */ 2447 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK) 2448 return; 2449 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 2450 != C_OK) return; 2451 2452 /* First try without blocking at all. */ 2453 ackreplicas = replicationCountAcksByOffset(c->woff); 2454 if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) { 2455 addReplyLongLong(c,ackreplicas); 2456 return; 2457 } 2458 2459 /* Otherwise block the client and put it into our list of clients 2460 * waiting for ack from slaves. */ 2461 c->bpop.timeout = timeout; 2462 c->bpop.reploffset = offset; 2463 c->bpop.numreplicas = numreplicas; 2464 listAddNodeTail(server.clients_waiting_acks,c); 2465 blockClient(c,BLOCKED_WAIT); 2466 2467 /* Make sure that the server will send an ACK request to all the slaves 2468 * before returning to the event loop. */ 2469 replicationRequestAckFromSlaves(); 2470 } 2471 2472 /* This is called by unblockClient() to perform the blocking op type 2473 * specific cleanup. We just remove the client from the list of clients 2474 * waiting for replica acks. Never call it directly, call unblockClient() 2475 * instead. */ 2476 void unblockClientWaitingReplicas(client *c) { 2477 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 2478 serverAssert(ln != NULL); 2479 listDelNode(server.clients_waiting_acks,ln); 2480 } 2481 2482 /* Check if there are clients blocked in WAIT that can be unblocked since 2483 * we received enough ACKs from slaves. */ 2484 void processClientsWaitingReplicas(void) { 2485 long long last_offset = 0; 2486 int last_numreplicas = 0; 2487 2488 listIter li; 2489 listNode *ln; 2490 2491 listRewind(server.clients_waiting_acks,&li); 2492 while((ln = listNext(&li))) { 2493 client *c = ln->value; 2494 2495 /* Every time we find a client that is satisfied for a given 2496 * offset and number of replicas, we remember it so the next client 2497 * may be unblocked without calling replicationCountAcksByOffset() 2498 * if the requested offset / replicas were equal or less. */ 2499 if (last_offset && last_offset > c->bpop.reploffset && 2500 last_numreplicas > c->bpop.numreplicas) 2501 { 2502 unblockClient(c); 2503 addReplyLongLong(c,last_numreplicas); 2504 } else { 2505 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 2506 2507 if (numreplicas >= c->bpop.numreplicas) { 2508 last_offset = c->bpop.reploffset; 2509 last_numreplicas = numreplicas; 2510 unblockClient(c); 2511 addReplyLongLong(c,numreplicas); 2512 } 2513 } 2514 } 2515 } 2516 2517 /* Return the slave replication offset for this instance, that is 2518 * the offset for which we already processed the master replication stream. */ 2519 long long replicationGetSlaveOffset(void) { 2520 long long offset = 0; 2521 2522 if (server.masterhost != NULL) { 2523 if (server.master) { 2524 offset = server.master->reploff; 2525 } else if (server.cached_master) { 2526 offset = server.cached_master->reploff; 2527 } 2528 } 2529 /* offset may be -1 when the master does not support it at all, however 2530 * this function is designed to return an offset that can express the 2531 * amount of data processed by the master, so we return a positive 2532 * integer. */ 2533 if (offset < 0) offset = 0; 2534 return offset; 2535 } 2536 2537 /* --------------------------- REPLICATION CRON ---------------------------- */ 2538 2539 /* Replication cron function, called 1 time per second. */ 2540 void replicationCron(void) { 2541 static long long replication_cron_loops = 0; 2542 2543 /* Non blocking connection timeout? */ 2544 if (server.masterhost && 2545 (server.repl_state == REPL_STATE_CONNECTING || 2546 slaveIsInHandshakeState()) && 2547 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2548 { 2549 serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); 2550 cancelReplicationHandshake(); 2551 } 2552 2553 /* Bulk transfer I/O timeout? */ 2554 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && 2555 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 2556 { 2557 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 2558 cancelReplicationHandshake(); 2559 } 2560 2561 /* Timed out master when we are an already connected slave? */ 2562 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && 2563 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 2564 { 2565 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); 2566 freeClient(server.master); 2567 } 2568 2569 /* Check if we should connect to a MASTER */ 2570 if (server.repl_state == REPL_STATE_CONNECT) { 2571 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", 2572 server.masterhost, server.masterport); 2573 if (connectWithMaster() == C_OK) { 2574 serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); 2575 } 2576 } 2577 2578 /* Send ACK to master from time to time. 2579 * Note that we do not send periodic acks to masters that don't 2580 * support PSYNC and replication offsets. */ 2581 if (server.masterhost && server.master && 2582 !(server.master->flags & CLIENT_PRE_PSYNC)) 2583 replicationSendAck(); 2584 2585 /* If we have attached slaves, PING them from time to time. 2586 * So slaves can implement an explicit timeout to masters, and will 2587 * be able to detect a link disconnection even if the TCP connection 2588 * will not actually go down. */ 2589 listIter li; 2590 listNode *ln; 2591 robj *ping_argv[1]; 2592 2593 /* First, send PING according to ping_slave_period. */ 2594 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && 2595 listLength(server.slaves)) 2596 { 2597 /* Note that we don't send the PING if the clients are paused during 2598 * a Redis Cluster manual failover: the PING we send will otherwise 2599 * alter the replication offsets of master and slave, and will no longer 2600 * match the one stored into 'mf_master_offset' state. */ 2601 int manual_failover_in_progress = 2602 server.cluster_enabled && 2603 server.cluster->mf_end && 2604 clientsArePaused(); 2605 2606 if (!manual_failover_in_progress) { 2607 ping_argv[0] = createStringObject("PING",4); 2608 replicationFeedSlaves(server.slaves, server.slaveseldb, 2609 ping_argv, 1); 2610 decrRefCount(ping_argv[0]); 2611 } 2612 } 2613 2614 /* Second, send a newline to all the slaves in pre-synchronization 2615 * stage, that is, slaves waiting for the master to create the RDB file. 2616 * 2617 * Also send the a newline to all the chained slaves we have, if we lost 2618 * connection from our master, to keep the slaves aware that their 2619 * master is online. This is needed since sub-slaves only receive proxied 2620 * data from top-level masters, so there is no explicit pinging in order 2621 * to avoid altering the replication offsets. This special out of band 2622 * pings (newlines) can be sent, they will have no effect in the offset. 2623 * 2624 * The newline will be ignored by the slave but will refresh the 2625 * last interaction timer preventing a timeout. In this case we ignore the 2626 * ping period and refresh the connection once per second since certain 2627 * timeouts are set at a few seconds (example: PSYNC response). */ 2628 listRewind(server.slaves,&li); 2629 while((ln = listNext(&li))) { 2630 client *slave = ln->value; 2631 2632 int is_presync = 2633 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || 2634 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && 2635 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); 2636 2637 if (is_presync) { 2638 if (write(slave->fd, "\n", 1) == -1) { 2639 /* Don't worry about socket errors, it's just a ping. */ 2640 } 2641 } 2642 } 2643 2644 /* Disconnect timedout slaves. */ 2645 if (listLength(server.slaves)) { 2646 listIter li; 2647 listNode *ln; 2648 2649 listRewind(server.slaves,&li); 2650 while((ln = listNext(&li))) { 2651 client *slave = ln->value; 2652 2653 if (slave->replstate != SLAVE_STATE_ONLINE) continue; 2654 if (slave->flags & CLIENT_PRE_PSYNC) continue; 2655 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2656 { 2657 serverLog(LL_WARNING, "Disconnecting timedout replica: %s", 2658 replicationGetSlaveName(slave)); 2659 freeClient(slave); 2660 } 2661 } 2662 } 2663 2664 /* If this is a master without attached slaves and there is a replication 2665 * backlog active, in order to reclaim memory we can free it after some 2666 * (configured) time. Note that this cannot be done for slaves: slaves 2667 * without sub-slaves attached should still accumulate data into the 2668 * backlog, in order to reply to PSYNC queries if they are turned into 2669 * masters after a failover. */ 2670 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2671 server.repl_backlog && server.masterhost == NULL) 2672 { 2673 time_t idle = server.unixtime - server.repl_no_slaves_since; 2674 2675 if (idle > server.repl_backlog_time_limit) { 2676 /* When we free the backlog, we always use a new 2677 * replication ID and clear the ID2. This is needed 2678 * because when there is no backlog, the master_repl_offset 2679 * is not updated, but we would still retain our replication 2680 * ID, leading to the following problem: 2681 * 2682 * 1. We are a master instance. 2683 * 2. Our slave is promoted to master. It's repl-id-2 will 2684 * be the same as our repl-id. 2685 * 3. We, yet as master, receive some updates, that will not 2686 * increment the master_repl_offset. 2687 * 4. Later we are turned into a slave, connect to the new 2688 * master that will accept our PSYNC request by second 2689 * replication ID, but there will be data inconsistency 2690 * because we received writes. */ 2691 changeReplicationId(); 2692 clearReplicationId2(); 2693 freeReplicationBacklog(); 2694 serverLog(LL_NOTICE, 2695 "Replication backlog freed after %d seconds " 2696 "without connected replicas.", 2697 (int) server.repl_backlog_time_limit); 2698 } 2699 } 2700 2701 /* If AOF is disabled and we no longer have attached slaves, we can 2702 * free our Replication Script Cache as there is no need to propagate 2703 * EVALSHA at all. */ 2704 if (listLength(server.slaves) == 0 && 2705 server.aof_state == AOF_OFF && 2706 listLength(server.repl_scriptcache_fifo) != 0) 2707 { 2708 replicationScriptCacheFlush(); 2709 } 2710 2711 /* Start a BGSAVE good for replication if we have slaves in 2712 * WAIT_BGSAVE_START state. 2713 * 2714 * In case of diskless replication, we make sure to wait the specified 2715 * number of seconds (according to configuration) so that other slaves 2716 * have the time to arrive before we start streaming. */ 2717 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2718 time_t idle, max_idle = 0; 2719 int slaves_waiting = 0; 2720 int mincapa = -1; 2721 listNode *ln; 2722 listIter li; 2723 2724 listRewind(server.slaves,&li); 2725 while((ln = listNext(&li))) { 2726 client *slave = ln->value; 2727 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { 2728 idle = server.unixtime - slave->lastinteraction; 2729 if (idle > max_idle) max_idle = idle; 2730 slaves_waiting++; 2731 mincapa = (mincapa == -1) ? slave->slave_capa : 2732 (mincapa & slave->slave_capa); 2733 } 2734 } 2735 2736 if (slaves_waiting && 2737 (!server.repl_diskless_sync || 2738 max_idle > server.repl_diskless_sync_delay)) 2739 { 2740 /* Start the BGSAVE. The called function may start a 2741 * BGSAVE with socket target or disk target depending on the 2742 * configuration and slaves capabilities. */ 2743 startBgsaveForReplication(mincapa); 2744 } 2745 } 2746 2747 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2748 refreshGoodSlavesCount(); 2749 replication_cron_loops++; /* Incremented with frequency 1 HZ. */ 2750 } 2751