1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "redis.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 void putSlaveOnline(redisClient *slave); 44 45 /* --------------------------- Utility functions ---------------------------- */ 46 47 /* Return the pointer to a string representing the slave ip:listening_port 48 * pair. Mostly useful for logging, since we want to log a slave using its 49 * IP address and it's listening port which is more clear for the user, for 50 * example: "Closing connection with slave 10.1.2.3:6380". */ 51 char *replicationGetSlaveName(redisClient *c) { 52 static char buf[REDIS_PEER_ID_LEN]; 53 char ip[REDIS_IP_STR_LEN]; 54 55 ip[0] = '\0'; 56 buf[0] = '\0'; 57 if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { 58 if (c->slave_listening_port) 59 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); 60 else 61 snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip); 62 } else { 63 snprintf(buf,sizeof(buf),"client id #%llu", 64 (unsigned long long) c->id); 65 } 66 return buf; 67 } 68 69 /* ---------------------------------- MASTER -------------------------------- */ 70 71 void createReplicationBacklog(void) { 72 redisAssert(server.repl_backlog == NULL); 73 server.repl_backlog = zmalloc(server.repl_backlog_size); 74 server.repl_backlog_histlen = 0; 75 server.repl_backlog_idx = 0; 76 /* When a new backlog buffer is created, we increment the replication 77 * offset by one to make sure we'll not be able to PSYNC with any 78 * previous slave. This is needed because we avoid incrementing the 79 * master_repl_offset if no backlog exists nor slaves are attached. */ 80 server.master_repl_offset++; 81 82 /* We don't have any data inside our buffer, but virtually the first 83 * byte we have is the next byte that will be generated for the 84 * replication stream. */ 85 server.repl_backlog_off = server.master_repl_offset+1; 86 } 87 88 /* This function is called when the user modifies the replication backlog 89 * size at runtime. It is up to the function to both update the 90 * server.repl_backlog_size and to resize the buffer and setup it so that 91 * it contains the same data as the previous one (possibly less data, but 92 * the most recent bytes, or the same data and more free space in case the 93 * buffer is enlarged). */ 94 void resizeReplicationBacklog(long long newsize) { 95 if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE) 96 newsize = REDIS_REPL_BACKLOG_MIN_SIZE; 97 if (server.repl_backlog_size == newsize) return; 98 99 server.repl_backlog_size = newsize; 100 if (server.repl_backlog != NULL) { 101 /* What we actually do is to flush the old buffer and realloc a new 102 * empty one. It will refill with new data incrementally. 103 * The reason is that copying a few gigabytes adds latency and even 104 * worse often we need to alloc additional space before freeing the 105 * old buffer. */ 106 zfree(server.repl_backlog); 107 server.repl_backlog = zmalloc(server.repl_backlog_size); 108 server.repl_backlog_histlen = 0; 109 server.repl_backlog_idx = 0; 110 /* Next byte we have is... the next since the buffer is empty. */ 111 server.repl_backlog_off = server.master_repl_offset+1; 112 } 113 } 114 115 void freeReplicationBacklog(void) { 116 redisAssert(listLength(server.slaves) == 0); 117 zfree(server.repl_backlog); 118 server.repl_backlog = NULL; 119 } 120 121 /* Add data to the replication backlog. 122 * This function also increments the global replication offset stored at 123 * server.master_repl_offset, because there is no case where we want to feed 124 * the backlog without incrementing the buffer. */ 125 void feedReplicationBacklog(void *ptr, size_t len) { 126 unsigned char *p = ptr; 127 128 server.master_repl_offset += len; 129 130 /* This is a circular buffer, so write as much data we can at every 131 * iteration and rewind the "idx" index if we reach the limit. */ 132 while(len) { 133 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 134 if (thislen > len) thislen = len; 135 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 136 server.repl_backlog_idx += thislen; 137 if (server.repl_backlog_idx == server.repl_backlog_size) 138 server.repl_backlog_idx = 0; 139 len -= thislen; 140 p += thislen; 141 server.repl_backlog_histlen += thislen; 142 } 143 if (server.repl_backlog_histlen > server.repl_backlog_size) 144 server.repl_backlog_histlen = server.repl_backlog_size; 145 /* Set the offset of the first byte we have in the backlog. */ 146 server.repl_backlog_off = server.master_repl_offset - 147 server.repl_backlog_histlen + 1; 148 } 149 150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 151 * as input. */ 152 void feedReplicationBacklogWithObject(robj *o) { 153 char llstr[REDIS_LONGSTR_SIZE]; 154 void *p; 155 size_t len; 156 157 if (o->encoding == REDIS_ENCODING_INT) { 158 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 159 p = llstr; 160 } else { 161 len = sdslen(o->ptr); 162 p = o->ptr; 163 } 164 feedReplicationBacklog(p,len); 165 } 166 167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 168 listNode *ln; 169 listIter li; 170 int j, len; 171 char llstr[REDIS_LONGSTR_SIZE]; 172 173 /* If there aren't slaves, and there is no backlog buffer to populate, 174 * we can return ASAP. */ 175 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 176 177 /* We can't have slaves attached and no backlog. */ 178 redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 179 180 /* Send SELECT command to every slave if needed. */ 181 if (server.slaveseldb != dictid) { 182 robj *selectcmd; 183 184 /* For a few DBs we have pre-computed SELECT command. */ 185 if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { 186 selectcmd = shared.select[dictid]; 187 } else { 188 int dictid_len; 189 190 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 191 selectcmd = createObject(REDIS_STRING, 192 sdscatprintf(sdsempty(), 193 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 194 dictid_len, llstr)); 195 } 196 197 /* Add the SELECT command into the backlog. */ 198 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 199 200 /* Send it to slaves. */ 201 listRewind(slaves,&li); 202 while((ln = listNext(&li))) { 203 redisClient *slave = ln->value; 204 addReply(slave,selectcmd); 205 } 206 207 if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) 208 decrRefCount(selectcmd); 209 } 210 server.slaveseldb = dictid; 211 212 /* Write the command to the replication backlog if any. */ 213 if (server.repl_backlog) { 214 char aux[REDIS_LONGSTR_SIZE+3]; 215 216 /* Add the multi bulk reply length. */ 217 aux[0] = '*'; 218 len = ll2string(aux+1,sizeof(aux)-1,argc); 219 aux[len+1] = '\r'; 220 aux[len+2] = '\n'; 221 feedReplicationBacklog(aux,len+3); 222 223 for (j = 0; j < argc; j++) { 224 long objlen = stringObjectLen(argv[j]); 225 226 /* We need to feed the buffer with the object as a bulk reply 227 * not just as a plain string, so create the $..CRLF payload len 228 * and add the final CRLF */ 229 aux[0] = '$'; 230 len = ll2string(aux+1,sizeof(aux)-1,objlen); 231 aux[len+1] = '\r'; 232 aux[len+2] = '\n'; 233 feedReplicationBacklog(aux,len+3); 234 feedReplicationBacklogWithObject(argv[j]); 235 feedReplicationBacklog(aux+len+1,2); 236 } 237 } 238 239 /* Write the command to every slave. */ 240 listRewind(server.slaves,&li); 241 while((ln = listNext(&li))) { 242 redisClient *slave = ln->value; 243 244 /* Don't feed slaves that are still waiting for BGSAVE to start */ 245 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; 246 247 /* Feed slaves that are waiting for the initial SYNC (so these commands 248 * are queued in the output buffer until the initial SYNC completes), 249 * or are already in sync with the master. */ 250 251 /* Add the multi bulk length. */ 252 addReplyMultiBulkLen(slave,argc); 253 254 /* Finally any additional argument that was not stored inside the 255 * static buffer if any (from j to argc). */ 256 for (j = 0; j < argc; j++) 257 addReplyBulk(slave,argv[j]); 258 } 259 } 260 261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { 262 listNode *ln; 263 listIter li; 264 int j; 265 sds cmdrepr = sdsnew("+"); 266 robj *cmdobj; 267 struct timeval tv; 268 269 gettimeofday(&tv,NULL); 270 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 271 if (c->flags & REDIS_LUA_CLIENT) { 272 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 273 } else if (c->flags & REDIS_UNIX_SOCKET) { 274 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 275 } else { 276 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 277 } 278 279 for (j = 0; j < argc; j++) { 280 if (argv[j]->encoding == REDIS_ENCODING_INT) { 281 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 282 } else { 283 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 284 sdslen(argv[j]->ptr)); 285 } 286 if (j != argc-1) 287 cmdrepr = sdscatlen(cmdrepr," ",1); 288 } 289 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 290 cmdobj = createObject(REDIS_STRING,cmdrepr); 291 292 listRewind(monitors,&li); 293 while((ln = listNext(&li))) { 294 redisClient *monitor = ln->value; 295 addReply(monitor,cmdobj); 296 } 297 decrRefCount(cmdobj); 298 } 299 300 /* Feed the slave 'c' with the replication backlog starting from the 301 * specified 'offset' up to the end of the backlog. */ 302 long long addReplyReplicationBacklog(redisClient *c, long long offset) { 303 long long j, skip, len; 304 305 redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 306 307 if (server.repl_backlog_histlen == 0) { 308 redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero"); 309 return 0; 310 } 311 312 redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld", 313 server.repl_backlog_size); 314 redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld", 315 server.repl_backlog_off); 316 redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld", 317 server.repl_backlog_histlen); 318 redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld", 319 server.repl_backlog_idx); 320 321 /* Compute the amount of bytes we need to discard. */ 322 skip = offset - server.repl_backlog_off; 323 redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip); 324 325 /* Point j to the oldest byte, that is actaully our 326 * server.repl_backlog_off byte. */ 327 j = (server.repl_backlog_idx + 328 (server.repl_backlog_size-server.repl_backlog_histlen)) % 329 server.repl_backlog_size; 330 redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j); 331 332 /* Discard the amount of data to seek to the specified 'offset'. */ 333 j = (j + skip) % server.repl_backlog_size; 334 335 /* Feed slave with data. Since it is a circular buffer we have to 336 * split the reply in two parts if we are cross-boundary. */ 337 len = server.repl_backlog_histlen - skip; 338 redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len); 339 while(len) { 340 long long thislen = 341 ((server.repl_backlog_size - j) < len) ? 342 (server.repl_backlog_size - j) : len; 343 344 redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 345 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 346 len -= thislen; 347 j = 0; 348 } 349 return server.repl_backlog_histlen - skip; 350 } 351 352 /* This function handles the PSYNC command from the point of view of a 353 * master receiving a request for partial resynchronization. 354 * 355 * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed 356 * with the usual full resync. */ 357 int masterTryPartialResynchronization(redisClient *c) { 358 long long psync_offset, psync_len; 359 char *master_runid = c->argv[1]->ptr; 360 char buf[128]; 361 int buflen; 362 363 /* Is the runid of this master the same advertised by the wannabe slave 364 * via PSYNC? If runid changed this master is a different instance and 365 * there is no way to continue. */ 366 if (strcasecmp(master_runid, server.runid)) { 367 /* Run id "?" is used by slaves that want to force a full resync. */ 368 if (master_runid[0] != '?') { 369 redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " 370 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 371 master_runid, server.runid); 372 } else { 373 redisLog(REDIS_NOTICE,"Full resync requested by slave %s", 374 replicationGetSlaveName(c)); 375 } 376 goto need_full_resync; 377 } 378 379 /* We still have the data our slave is asking for? */ 380 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 381 REDIS_OK) goto need_full_resync; 382 if (!server.repl_backlog || 383 psync_offset < server.repl_backlog_off || 384 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 385 { 386 redisLog(REDIS_NOTICE, 387 "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); 388 if (psync_offset > server.master_repl_offset) { 389 redisLog(REDIS_WARNING, 390 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 391 } 392 goto need_full_resync; 393 } 394 395 /* If we reached this point, we are able to perform a partial resync: 396 * 1) Set client state to make it a slave. 397 * 2) Inform the client we can continue with +CONTINUE 398 * 3) Send the backlog data (from the offset to the end) to the slave. */ 399 c->flags |= REDIS_SLAVE; 400 c->replstate = REDIS_REPL_ONLINE; 401 c->repl_ack_time = server.unixtime; 402 c->repl_put_online_on_ack = 0; 403 listAddNodeTail(server.slaves,c); 404 /* We can't use the connection buffers since they are used to accumulate 405 * new commands at this stage. But we are sure the socket send buffer is 406 * empty so this write will never fail actually. */ 407 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 408 if (write(c->fd,buf,buflen) != buflen) { 409 freeClientAsync(c); 410 return REDIS_OK; 411 } 412 psync_len = addReplyReplicationBacklog(c,psync_offset); 413 redisLog(REDIS_NOTICE, 414 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 415 replicationGetSlaveName(c), 416 psync_len, psync_offset); 417 /* Note that we don't need to set the selected DB at server.slaveseldb 418 * to -1 to force the master to emit SELECT, since the slave already 419 * has this state from the previous connection with the master. */ 420 421 refreshGoodSlavesCount(); 422 return REDIS_OK; /* The caller can return, no full resync needed. */ 423 424 need_full_resync: 425 /* We need a full resync for some reason... notify the client. */ 426 psync_offset = server.master_repl_offset; 427 /* Add 1 to psync_offset if it the replication backlog does not exists 428 * as when it will be created later we'll increment the offset by one. */ 429 if (server.repl_backlog == NULL) psync_offset++; 430 /* Again, we can't use the connection buffers (see above). */ 431 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 432 server.runid,psync_offset); 433 if (write(c->fd,buf,buflen) != buflen) { 434 freeClientAsync(c); 435 return REDIS_OK; 436 } 437 return REDIS_ERR; 438 } 439 440 /* Start a BGSAVE for replication goals, which is, selecting the disk or 441 * socket target depending on the configuration, and making sure that 442 * the script cache is flushed before to start. 443 * 444 * Returns REDIS_OK on success or REDIS_ERR otherwise. */ 445 int startBgsaveForReplication(void) { 446 int retval; 447 448 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", 449 server.repl_diskless_sync ? "slaves sockets" : "disk"); 450 451 if (server.repl_diskless_sync) 452 retval = rdbSaveToSlavesSockets(); 453 else 454 retval = rdbSaveBackground(server.rdb_filename); 455 456 /* Flush the script cache, since we need that slave differences are 457 * accumulated without requiring slaves to match our cached scripts. */ 458 if (retval == REDIS_OK) replicationScriptCacheFlush(); 459 return retval; 460 } 461 462 /* SYNC and PSYNC command implemenation. */ 463 void syncCommand(redisClient *c) { 464 /* ignore SYNC if already slave or in monitor mode */ 465 if (c->flags & REDIS_SLAVE) return; 466 467 /* Refuse SYNC requests if we are a slave but the link with our master 468 * is not ok... */ 469 if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { 470 addReplyError(c,"Can't SYNC while not connected with my master"); 471 return; 472 } 473 474 /* SYNC can't be issued when the server has pending data to send to 475 * the client about already issued commands. We need a fresh reply 476 * buffer registering the differences between the BGSAVE and the current 477 * dataset, so that we can copy to other slaves if needed. */ 478 if (listLength(c->reply) != 0 || c->bufpos != 0) { 479 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 480 return; 481 } 482 483 redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", 484 replicationGetSlaveName(c)); 485 486 /* Try a partial resynchronization if this is a PSYNC command. 487 * If it fails, we continue with usual full resynchronization, however 488 * when this happens masterTryPartialResynchronization() already 489 * replied with: 490 * 491 * +FULLRESYNC <runid> <offset> 492 * 493 * So the slave knows the new runid and offset to try a PSYNC later 494 * if the connection with the master is lost. */ 495 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 496 if (masterTryPartialResynchronization(c) == REDIS_OK) { 497 server.stat_sync_partial_ok++; 498 return; /* No full resync needed, return. */ 499 } else { 500 char *master_runid = c->argv[1]->ptr; 501 502 /* Increment stats for failed PSYNCs, but only if the 503 * runid is not "?", as this is used by slaves to force a full 504 * resync on purpose when they are not albe to partially 505 * resync. */ 506 if (master_runid[0] != '?') server.stat_sync_partial_err++; 507 } 508 } else { 509 /* If a slave uses SYNC, we are dealing with an old implementation 510 * of the replication protocol (like redis-cli --slave). Flag the client 511 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 512 c->flags |= REDIS_PRE_PSYNC; 513 } 514 515 /* Full resynchronization. */ 516 server.stat_sync_full++; 517 518 /* Here we need to check if there is a background saving operation 519 * in progress, or if it is required to start one */ 520 if (server.rdb_child_pid != -1 && 521 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) 522 { 523 /* Ok a background save is in progress. Let's check if it is a good 524 * one for replication, i.e. if there is another slave that is 525 * registering differences since the server forked to save. */ 526 redisClient *slave; 527 listNode *ln; 528 listIter li; 529 530 listRewind(server.slaves,&li); 531 while((ln = listNext(&li))) { 532 slave = ln->value; 533 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; 534 } 535 if (ln) { 536 /* Perfect, the server is already registering differences for 537 * another slave. Set the right state, and copy the buffer. */ 538 copyClientOutputBuffer(c,slave); 539 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 540 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); 541 } else { 542 /* No way, we need to wait for the next BGSAVE in order to 543 * register differences. */ 544 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 545 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 546 } 547 } else if (server.rdb_child_pid != -1 && 548 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) 549 { 550 /* There is an RDB child process but it is writing directly to 551 * children sockets. We need to wait for the next BGSAVE 552 * in order to synchronize. */ 553 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 554 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 555 } else { 556 if (server.repl_diskless_sync) { 557 /* Diskless replication RDB child is created inside 558 * replicationCron() since we want to delay its start a 559 * few seconds to wait for more slaves to arrive. */ 560 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 561 if (server.repl_diskless_sync_delay) 562 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); 563 } else { 564 /* Ok we don't have a BGSAVE in progress, let's start one. */ 565 if (startBgsaveForReplication() != REDIS_OK) { 566 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); 567 addReplyError(c,"Unable to perform background save"); 568 return; 569 } 570 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 571 } 572 } 573 574 if (server.repl_disable_tcp_nodelay) 575 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 576 c->repldbfd = -1; 577 c->flags |= REDIS_SLAVE; 578 server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ 579 listAddNodeTail(server.slaves,c); 580 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 581 createReplicationBacklog(); 582 return; 583 } 584 585 /* REPLCONF <option> <value> <option> <value> ... 586 * This command is used by a slave in order to configure the replication 587 * process before starting it with the SYNC command. 588 * 589 * Currently the only use of this command is to communicate to the master 590 * what is the listening port of the Slave redis instance, so that the 591 * master can accurately list slaves and their listening ports in 592 * the INFO output. 593 * 594 * In the future the same command can be used in order to configure 595 * the replication to initiate an incremental replication instead of a 596 * full resync. */ 597 void replconfCommand(redisClient *c) { 598 int j; 599 600 if ((c->argc % 2) == 0) { 601 /* Number of arguments must be odd to make sure that every 602 * option has a corresponding value. */ 603 addReply(c,shared.syntaxerr); 604 return; 605 } 606 607 /* Process every option-value pair. */ 608 for (j = 1; j < c->argc; j+=2) { 609 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 610 long port; 611 612 if ((getLongFromObjectOrReply(c,c->argv[j+1], 613 &port,NULL) != REDIS_OK)) 614 return; 615 c->slave_listening_port = port; 616 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 617 /* REPLCONF ACK is used by slave to inform the master the amount 618 * of replication stream that it processed so far. It is an 619 * internal only command that normal clients should never use. */ 620 long long offset; 621 622 if (!(c->flags & REDIS_SLAVE)) return; 623 if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK)) 624 return; 625 if (offset > c->repl_ack_off) 626 c->repl_ack_off = offset; 627 c->repl_ack_time = server.unixtime; 628 /* If this was a diskless replication, we need to really put 629 * the slave online when the first ACK is received (which 630 * confirms slave is online and ready to get more data). */ 631 if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE) 632 putSlaveOnline(c); 633 /* Note: this command does not reply anything! */ 634 return; 635 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 636 /* REPLCONF GETACK is used in order to request an ACK ASAP 637 * to the slave. */ 638 if (server.masterhost && server.master) replicationSendAck(); 639 /* Note: this command does not reply anything! */ 640 } else { 641 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 642 (char*)c->argv[j]->ptr); 643 return; 644 } 645 } 646 addReply(c,shared.ok); 647 } 648 649 /* This function puts a slave in the online state, and should be called just 650 * after a slave received the RDB file for the initial synchronization, and 651 * we are finally ready to send the incremental stream of commands. 652 * 653 * It does a few things: 654 * 655 * 1) Put the slave in ONLINE state. 656 * 2) Make sure the writable event is re-installed, since calling the SYNC 657 * command disables it, so that we can accumulate output buffer without 658 * sending it to the slave. 659 * 3) Update the count of good slaves. */ 660 void putSlaveOnline(redisClient *slave) { 661 slave->replstate = REDIS_REPL_ONLINE; 662 slave->repl_put_online_on_ack = 0; 663 slave->repl_ack_time = server.unixtime; 664 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 665 sendReplyToClient, slave) == AE_ERR) { 666 redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 667 freeClient(slave); 668 return; 669 } 670 refreshGoodSlavesCount(); 671 redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded", 672 replicationGetSlaveName(slave)); 673 } 674 675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 676 redisClient *slave = privdata; 677 REDIS_NOTUSED(el); 678 REDIS_NOTUSED(mask); 679 char buf[REDIS_IOBUF_LEN]; 680 ssize_t nwritten, buflen; 681 682 /* Before sending the RDB file, we send the preamble as configured by the 683 * replication process. Currently the preamble is just the bulk count of 684 * the file in the form "$<length>\r\n". */ 685 if (slave->replpreamble) { 686 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 687 if (nwritten == -1) { 688 redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", 689 strerror(errno)); 690 freeClient(slave); 691 return; 692 } 693 server.stat_net_output_bytes += nwritten; 694 sdsrange(slave->replpreamble,nwritten,-1); 695 if (sdslen(slave->replpreamble) == 0) { 696 sdsfree(slave->replpreamble); 697 slave->replpreamble = NULL; 698 /* fall through sending data. */ 699 } else { 700 return; 701 } 702 } 703 704 /* If the preamble was already transfered, send the RDB bulk data. */ 705 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 706 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 707 if (buflen <= 0) { 708 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", 709 (buflen == 0) ? "premature EOF" : strerror(errno)); 710 freeClient(slave); 711 return; 712 } 713 if ((nwritten = write(fd,buf,buflen)) == -1) { 714 if (errno != EAGAIN) { 715 redisLog(REDIS_WARNING,"Write error sending DB to slave: %s", 716 strerror(errno)); 717 freeClient(slave); 718 } 719 return; 720 } 721 slave->repldboff += nwritten; 722 server.stat_net_output_bytes += nwritten; 723 if (slave->repldboff == slave->repldbsize) { 724 close(slave->repldbfd); 725 slave->repldbfd = -1; 726 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 727 putSlaveOnline(slave); 728 } 729 } 730 731 /* This function is called at the end of every background saving, 732 * or when the replication RDB transfer strategy is modified from 733 * disk to socket or the other way around. 734 * 735 * The goal of this function is to handle slaves waiting for a successful 736 * background saving in order to perform non-blocking synchronization, and 737 * to schedule a new BGSAVE if there are slaves that attached while a 738 * BGSAVE was in progress, but it was not a good one for replication (no 739 * other slave was accumulating differences). 740 * 741 * The argument bgsaveerr is REDIS_OK if the background saving succeeded 742 * otherwise REDIS_ERR is passed to the function. 743 * The 'type' argument is the type of the child that terminated 744 * (if it had a disk or socket target). */ 745 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 746 listNode *ln; 747 int startbgsave = 0; 748 listIter li; 749 750 listRewind(server.slaves,&li); 751 while((ln = listNext(&li))) { 752 redisClient *slave = ln->value; 753 754 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 755 startbgsave = 1; 756 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 757 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { 758 struct redis_stat buf; 759 760 /* If this was an RDB on disk save, we have to prepare to send 761 * the RDB from disk to the slave socket. Otherwise if this was 762 * already an RDB -> Slaves socket transfer, used in the case of 763 * diskless replication, our work is trivial, we can just put 764 * the slave online. */ 765 if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { 766 redisLog(REDIS_NOTICE, 767 "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 768 replicationGetSlaveName(slave)); 769 /* Note: we wait for a REPLCONF ACK message from slave in 770 * order to really put it online (install the write handler 771 * so that the accumulated data can be transfered). However 772 * we change the replication state ASAP, since our slave 773 * is technically online now. */ 774 slave->replstate = REDIS_REPL_ONLINE; 775 slave->repl_put_online_on_ack = 1; 776 } else { 777 if (bgsaveerr != REDIS_OK) { 778 freeClient(slave); 779 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); 780 continue; 781 } 782 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 783 redis_fstat(slave->repldbfd,&buf) == -1) { 784 freeClient(slave); 785 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 786 continue; 787 } 788 slave->repldboff = 0; 789 slave->repldbsize = buf.st_size; 790 slave->replstate = REDIS_REPL_SEND_BULK; 791 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 792 (unsigned long long) slave->repldbsize); 793 794 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 795 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 796 freeClient(slave); 797 continue; 798 } 799 } 800 } 801 } 802 if (startbgsave) { 803 if (startBgsaveForReplication() != REDIS_OK) { 804 listIter li; 805 806 listRewind(server.slaves,&li); 807 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); 808 while((ln = listNext(&li))) { 809 redisClient *slave = ln->value; 810 811 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 812 freeClient(slave); 813 } 814 } 815 } 816 } 817 818 /* ----------------------------------- SLAVE -------------------------------- */ 819 820 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 821 void replicationAbortSyncTransfer(void) { 822 redisAssert(server.repl_state == REDIS_REPL_TRANSFER); 823 824 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 825 close(server.repl_transfer_s); 826 close(server.repl_transfer_fd); 827 unlink(server.repl_transfer_tmpfile); 828 zfree(server.repl_transfer_tmpfile); 829 server.repl_state = REDIS_REPL_CONNECT; 830 } 831 832 /* Avoid the master to detect the slave is timing out while loading the 833 * RDB file in initial synchronization. We send a single newline character 834 * that is valid protocol but is guaranteed to either be sent entierly or 835 * not, since the byte is indivisible. 836 * 837 * The function is called in two contexts: while we flush the current 838 * data with emptyDb(), and while we load the new data received as an 839 * RDB file from the master. */ 840 void replicationSendNewlineToMaster(void) { 841 static time_t newline_sent; 842 if (time(NULL) != newline_sent) { 843 newline_sent = time(NULL); 844 if (write(server.repl_transfer_s,"\n",1) == -1) { 845 /* Pinging back in this stage is best-effort. */ 846 } 847 } 848 } 849 850 /* Callback used by emptyDb() while flushing away old data to load 851 * the new dataset received by the master. */ 852 void replicationEmptyDbCallback(void *privdata) { 853 REDIS_NOTUSED(privdata); 854 replicationSendNewlineToMaster(); 855 } 856 857 /* Once we have a link with the master and the synchroniziation was 858 * performed, this function materializes the master client we store 859 * at server.master, starting from the specified file descriptor. */ 860 void replicationCreateMasterClient(int fd) { 861 server.master = createClient(fd); 862 server.master->flags |= REDIS_MASTER; 863 server.master->authenticated = 1; 864 server.repl_state = REDIS_REPL_CONNECTED; 865 server.master->reploff = server.repl_master_initial_offset; 866 memcpy(server.master->replrunid, server.repl_master_runid, 867 sizeof(server.repl_master_runid)); 868 /* If master offset is set to -1, this master is old and is not 869 * PSYNC capable, so we flag it accordingly. */ 870 if (server.master->reploff == -1) 871 server.master->flags |= REDIS_PRE_PSYNC; 872 } 873 874 /* Asynchronously read the SYNC payload we receive from a master */ 875 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 876 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 877 char buf[4096]; 878 ssize_t nread, readlen; 879 off_t left; 880 REDIS_NOTUSED(el); 881 REDIS_NOTUSED(privdata); 882 REDIS_NOTUSED(mask); 883 884 /* Static vars used to hold the EOF mark, and the last bytes received 885 * form the server: when they match, we reached the end of the transfer. */ 886 static char eofmark[REDIS_RUN_ID_SIZE]; 887 static char lastbytes[REDIS_RUN_ID_SIZE]; 888 static int usemark = 0; 889 890 /* If repl_transfer_size == -1 we still have to read the bulk length 891 * from the master reply. */ 892 if (server.repl_transfer_size == -1) { 893 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 894 redisLog(REDIS_WARNING, 895 "I/O error reading bulk count from MASTER: %s", 896 strerror(errno)); 897 goto error; 898 } 899 900 if (buf[0] == '-') { 901 redisLog(REDIS_WARNING, 902 "MASTER aborted replication with an error: %s", 903 buf+1); 904 goto error; 905 } else if (buf[0] == '\0') { 906 /* At this stage just a newline works as a PING in order to take 907 * the connection live. So we refresh our last interaction 908 * timestamp. */ 909 server.repl_transfer_lastio = server.unixtime; 910 return; 911 } else if (buf[0] != '$') { 912 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 913 goto error; 914 } 915 916 /* There are two possible forms for the bulk payload. One is the 917 * usual $<count> bulk format. The other is used for diskless transfers 918 * when the master does not know beforehand the size of the file to 919 * transfer. In the latter case, the following format is used: 920 * 921 * $EOF:<40 bytes delimiter> 922 * 923 * At the end of the file the announced delimiter is transmitted. The 924 * delimiter is long and random enough that the probability of a 925 * collision with the actual file content can be ignored. */ 926 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { 927 usemark = 1; 928 memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); 929 memset(lastbytes,0,REDIS_RUN_ID_SIZE); 930 /* Set any repl_transfer_size to avoid entering this code path 931 * at the next call. */ 932 server.repl_transfer_size = 0; 933 redisLog(REDIS_NOTICE, 934 "MASTER <-> SLAVE sync: receiving streamed RDB from master"); 935 } else { 936 usemark = 0; 937 server.repl_transfer_size = strtol(buf+1,NULL,10); 938 redisLog(REDIS_NOTICE, 939 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 940 (long long) server.repl_transfer_size); 941 } 942 return; 943 } 944 945 /* Read bulk data */ 946 if (usemark) { 947 readlen = sizeof(buf); 948 } else { 949 left = server.repl_transfer_size - server.repl_transfer_read; 950 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 951 } 952 953 nread = read(fd,buf,readlen); 954 if (nread <= 0) { 955 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", 956 (nread == -1) ? strerror(errno) : "connection lost"); 957 replicationAbortSyncTransfer(); 958 return; 959 } 960 server.stat_net_input_bytes += nread; 961 962 /* When a mark is used, we want to detect EOF asap in order to avoid 963 * writing the EOF mark into the file... */ 964 int eof_reached = 0; 965 966 if (usemark) { 967 /* Update the last bytes array, and check if it matches our delimiter.*/ 968 if (nread >= REDIS_RUN_ID_SIZE) { 969 memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); 970 } else { 971 int rem = REDIS_RUN_ID_SIZE-nread; 972 memmove(lastbytes,lastbytes+nread,rem); 973 memcpy(lastbytes+rem,buf,nread); 974 } 975 if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; 976 } 977 978 server.repl_transfer_lastio = server.unixtime; 979 if (write(server.repl_transfer_fd,buf,nread) != nread) { 980 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 981 goto error; 982 } 983 server.repl_transfer_read += nread; 984 985 /* Delete the last 40 bytes from the file if we reached EOF. */ 986 if (usemark && eof_reached) { 987 if (ftruncate(server.repl_transfer_fd, 988 server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1) 989 { 990 redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 991 goto error; 992 } 993 } 994 995 /* Sync data on disk from time to time, otherwise at the end of the transfer 996 * we may suffer a big delay as the memory buffers are copied into the 997 * actual disk. */ 998 if (server.repl_transfer_read >= 999 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 1000 { 1001 off_t sync_size = server.repl_transfer_read - 1002 server.repl_transfer_last_fsync_off; 1003 rdb_fsync_range(server.repl_transfer_fd, 1004 server.repl_transfer_last_fsync_off, sync_size); 1005 server.repl_transfer_last_fsync_off += sync_size; 1006 } 1007 1008 /* Check if the transfer is now complete */ 1009 if (!usemark) { 1010 if (server.repl_transfer_read == server.repl_transfer_size) 1011 eof_reached = 1; 1012 } 1013 1014 if (eof_reached) { 1015 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 1016 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1017 replicationAbortSyncTransfer(); 1018 return; 1019 } 1020 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 1021 signalFlushedDb(-1); 1022 emptyDb(replicationEmptyDbCallback); 1023 /* Before loading the DB into memory we need to delete the readable 1024 * handler, otherwise it will get called recursively since 1025 * rdbLoad() will call the event loop to process events from time to 1026 * time for non blocking loading. */ 1027 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1028 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 1029 if (rdbLoad(server.rdb_filename) != REDIS_OK) { 1030 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1031 replicationAbortSyncTransfer(); 1032 return; 1033 } 1034 /* Final setup of the connected slave <- master link */ 1035 zfree(server.repl_transfer_tmpfile); 1036 close(server.repl_transfer_fd); 1037 replicationCreateMasterClient(server.repl_transfer_s); 1038 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 1039 /* Restart the AOF subsystem now that we finished the sync. This 1040 * will trigger an AOF rewrite, and when done will start appending 1041 * to the new file. */ 1042 if (server.aof_state != REDIS_AOF_OFF) { 1043 int retry = 10; 1044 1045 stopAppendOnly(); 1046 while (retry-- && startAppendOnly() == REDIS_ERR) { 1047 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 1048 sleep(1); 1049 } 1050 if (!retry) { 1051 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 1052 exit(1); 1053 } 1054 } 1055 } 1056 1057 return; 1058 1059 error: 1060 replicationAbortSyncTransfer(); 1061 return; 1062 } 1063 1064 /* Send a synchronous command to the master. Used to send AUTH and 1065 * REPLCONF commands before starting the replication with SYNC. 1066 * 1067 * The command returns an sds string representing the result of the 1068 * operation. On error the first byte is a "-". 1069 */ 1070 char *sendSynchronousCommand(int fd, ...) { 1071 va_list ap; 1072 sds cmd = sdsempty(); 1073 char *arg, buf[256]; 1074 1075 /* Create the command to send to the master, we use simple inline 1076 * protocol for simplicity as currently we only send simple strings. */ 1077 va_start(ap,fd); 1078 while(1) { 1079 arg = va_arg(ap, char*); 1080 if (arg == NULL) break; 1081 1082 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 1083 cmd = sdscat(cmd,arg); 1084 } 1085 cmd = sdscatlen(cmd,"\r\n",2); 1086 1087 /* Transfer command to the server. */ 1088 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { 1089 sdsfree(cmd); 1090 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1091 strerror(errno)); 1092 } 1093 sdsfree(cmd); 1094 1095 /* Read the reply from the server. */ 1096 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) 1097 { 1098 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1099 strerror(errno)); 1100 } 1101 return sdsnew(buf); 1102 } 1103 1104 /* Try a partial resynchronization with the master if we are about to reconnect. 1105 * If there is no cached master structure, at least try to issue a 1106 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1107 * command in order to obtain the master run id and the master replication 1108 * global offset. 1109 * 1110 * This function is designed to be called from syncWithMaster(), so the 1111 * following assumptions are made: 1112 * 1113 * 1) We pass the function an already connected socket "fd". 1114 * 2) This function does not close the file descriptor "fd". However in case 1115 * of successful partial resynchronization, the function will reuse 1116 * 'fd' as file descriptor of the server.master client structure. 1117 * 1118 * The function returns: 1119 * 1120 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 1121 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1122 * In this case the master run_id and global replication 1123 * offset is saved. 1124 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1125 * the caller should fall back to SYNC. 1126 */ 1127 1128 #define PSYNC_CONTINUE 0 1129 #define PSYNC_FULLRESYNC 1 1130 #define PSYNC_NOT_SUPPORTED 2 1131 int slaveTryPartialResynchronization(int fd) { 1132 char *psync_runid; 1133 char psync_offset[32]; 1134 sds reply; 1135 1136 /* Initially set repl_master_initial_offset to -1 to mark the current 1137 * master run_id and offset as not valid. Later if we'll be able to do 1138 * a FULL resync using the PSYNC command we'll set the offset at the 1139 * right value, so that this information will be propagated to the 1140 * client structure representing the master into server.master. */ 1141 server.repl_master_initial_offset = -1; 1142 1143 if (server.cached_master) { 1144 psync_runid = server.cached_master->replrunid; 1145 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1146 redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 1147 } else { 1148 redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); 1149 psync_runid = "?"; 1150 memcpy(psync_offset,"-1",3); 1151 } 1152 1153 /* Issue the PSYNC command */ 1154 reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); 1155 1156 if (!strncmp(reply,"+FULLRESYNC",11)) { 1157 char *runid = NULL, *offset = NULL; 1158 1159 /* FULL RESYNC, parse the reply in order to extract the run id 1160 * and the replication offset. */ 1161 runid = strchr(reply,' '); 1162 if (runid) { 1163 runid++; 1164 offset = strchr(runid,' '); 1165 if (offset) offset++; 1166 } 1167 if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { 1168 redisLog(REDIS_WARNING, 1169 "Master replied with wrong +FULLRESYNC syntax."); 1170 /* This is an unexpected condition, actually the +FULLRESYNC 1171 * reply means that the master supports PSYNC, but the reply 1172 * format seems wrong. To stay safe we blank the master 1173 * runid to make sure next PSYNCs will fail. */ 1174 memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); 1175 } else { 1176 memcpy(server.repl_master_runid, runid, offset-runid-1); 1177 server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; 1178 server.repl_master_initial_offset = strtoll(offset,NULL,10); 1179 redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", 1180 server.repl_master_runid, 1181 server.repl_master_initial_offset); 1182 } 1183 /* We are going to full resync, discard the cached master structure. */ 1184 replicationDiscardCachedMaster(); 1185 sdsfree(reply); 1186 return PSYNC_FULLRESYNC; 1187 } 1188 1189 if (!strncmp(reply,"+CONTINUE",9)) { 1190 /* Partial resync was accepted, set the replication state accordingly */ 1191 redisLog(REDIS_NOTICE, 1192 "Successful partial resynchronization with master."); 1193 sdsfree(reply); 1194 replicationResurrectCachedMaster(fd); 1195 return PSYNC_CONTINUE; 1196 } 1197 1198 /* If we reach this point we receied either an error since the master does 1199 * not understand PSYNC, or an unexpected reply from the master. 1200 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1201 1202 if (strncmp(reply,"-ERR",4)) { 1203 /* If it's not an error, log the unexpected event. */ 1204 redisLog(REDIS_WARNING, 1205 "Unexpected reply to PSYNC from master: %s", reply); 1206 } else { 1207 redisLog(REDIS_NOTICE, 1208 "Master does not support PSYNC or is in " 1209 "error state (reply: %s)", reply); 1210 } 1211 sdsfree(reply); 1212 replicationDiscardCachedMaster(); 1213 return PSYNC_NOT_SUPPORTED; 1214 } 1215 1216 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1217 char tmpfile[256], *err; 1218 int dfd, maxtries = 5; 1219 int sockerr = 0, psync_result; 1220 socklen_t errlen = sizeof(sockerr); 1221 REDIS_NOTUSED(el); 1222 REDIS_NOTUSED(privdata); 1223 REDIS_NOTUSED(mask); 1224 1225 /* If this event fired after the user turned the instance into a master 1226 * with SLAVEOF NO ONE we must just return ASAP. */ 1227 if (server.repl_state == REDIS_REPL_NONE) { 1228 close(fd); 1229 return; 1230 } 1231 1232 /* Check for errors in the socket. */ 1233 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1234 sockerr = errno; 1235 if (sockerr) { 1236 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1237 redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", 1238 strerror(sockerr)); 1239 goto error; 1240 } 1241 1242 /* If we were connecting, it's time to send a non blocking PING, we want to 1243 * make sure the master is able to reply before going into the actual 1244 * replication process where we have long timeouts in the order of 1245 * seconds (in the meantime the slave would block). */ 1246 if (server.repl_state == REDIS_REPL_CONNECTING) { 1247 redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); 1248 /* Delete the writable event so that the readable event remains 1249 * registered and we can wait for the PONG reply. */ 1250 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1251 server.repl_state = REDIS_REPL_RECEIVE_PONG; 1252 /* Send the PING, don't check for errors at all, we have the timeout 1253 * that will take care about this. */ 1254 syncWrite(fd,"PING\r\n",6,100); 1255 return; 1256 } 1257 1258 /* Receive the PONG command. */ 1259 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { 1260 char buf[1024]; 1261 1262 /* Delete the readable event, we no longer need it now that there is 1263 * the PING reply to read. */ 1264 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1265 1266 /* Read the reply with explicit timeout. */ 1267 buf[0] = '\0'; 1268 if (syncReadLine(fd,buf,sizeof(buf), 1269 server.repl_syncio_timeout*1000) == -1) 1270 { 1271 redisLog(REDIS_WARNING, 1272 "I/O error reading PING reply from master: %s", 1273 strerror(errno)); 1274 goto error; 1275 } 1276 1277 /* We accept only two replies as valid, a positive +PONG reply 1278 * (we just check for "+") or an authentication error. 1279 * Note that older versions of Redis replied with "operation not 1280 * permitted" instead of using a proper error code, so we test 1281 * both. */ 1282 if (buf[0] != '+' && 1283 strncmp(buf,"-NOAUTH",7) != 0 && 1284 strncmp(buf,"-ERR operation not permitted",28) != 0) 1285 { 1286 redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); 1287 goto error; 1288 } else { 1289 redisLog(REDIS_NOTICE, 1290 "Master replied to PING, replication can continue..."); 1291 } 1292 } 1293 1294 /* AUTH with the master if required. */ 1295 if(server.masterauth) { 1296 err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); 1297 if (err[0] == '-') { 1298 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); 1299 sdsfree(err); 1300 goto error; 1301 } 1302 sdsfree(err); 1303 } 1304 1305 /* Set the slave port, so that Master's INFO command can list the 1306 * slave listening port correctly. */ 1307 { 1308 sds port = sdsfromlonglong(server.port); 1309 err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, 1310 NULL); 1311 sdsfree(port); 1312 /* Ignore the error if any, not all the Redis versions support 1313 * REPLCONF listening-port. */ 1314 if (err[0] == '-') { 1315 redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); 1316 } 1317 sdsfree(err); 1318 } 1319 1320 /* Try a partial resynchonization. If we don't have a cached master 1321 * slaveTryPartialResynchronization() will at least try to use PSYNC 1322 * to start a full resynchronization so that we get the master run id 1323 * and the global offset, to try a partial resync at the next 1324 * reconnection attempt. */ 1325 psync_result = slaveTryPartialResynchronization(fd); 1326 if (psync_result == PSYNC_CONTINUE) { 1327 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1328 return; 1329 } 1330 1331 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1332 * and the server.repl_master_runid and repl_master_initial_offset are 1333 * already populated. */ 1334 if (psync_result == PSYNC_NOT_SUPPORTED) { 1335 redisLog(REDIS_NOTICE,"Retrying with SYNC..."); 1336 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1337 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", 1338 strerror(errno)); 1339 goto error; 1340 } 1341 } 1342 1343 /* Prepare a suitable temp file for bulk transfer */ 1344 while(maxtries--) { 1345 snprintf(tmpfile,256, 1346 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1347 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1348 if (dfd != -1) break; 1349 sleep(1); 1350 } 1351 if (dfd == -1) { 1352 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1353 goto error; 1354 } 1355 1356 /* Setup the non blocking download of the bulk file. */ 1357 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1358 == AE_ERR) 1359 { 1360 redisLog(REDIS_WARNING, 1361 "Can't create readable event for SYNC: %s (fd=%d)", 1362 strerror(errno),fd); 1363 goto error; 1364 } 1365 1366 server.repl_state = REDIS_REPL_TRANSFER; 1367 server.repl_transfer_size = -1; 1368 server.repl_transfer_read = 0; 1369 server.repl_transfer_last_fsync_off = 0; 1370 server.repl_transfer_fd = dfd; 1371 server.repl_transfer_lastio = server.unixtime; 1372 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1373 return; 1374 1375 error: 1376 close(fd); 1377 server.repl_transfer_s = -1; 1378 server.repl_state = REDIS_REPL_CONNECT; 1379 return; 1380 } 1381 1382 int connectWithMaster(void) { 1383 int fd; 1384 1385 fd = anetTcpNonBlockBindConnect(NULL, 1386 server.masterhost,server.masterport,REDIS_BIND_ADDR); 1387 if (fd == -1) { 1388 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", 1389 strerror(errno)); 1390 return REDIS_ERR; 1391 } 1392 1393 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1394 AE_ERR) 1395 { 1396 close(fd); 1397 redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); 1398 return REDIS_ERR; 1399 } 1400 1401 server.repl_transfer_lastio = server.unixtime; 1402 server.repl_transfer_s = fd; 1403 server.repl_state = REDIS_REPL_CONNECTING; 1404 return REDIS_OK; 1405 } 1406 1407 /* This function can be called when a non blocking connection is currently 1408 * in progress to undo it. */ 1409 void undoConnectWithMaster(void) { 1410 int fd = server.repl_transfer_s; 1411 1412 redisAssert(server.repl_state == REDIS_REPL_CONNECTING || 1413 server.repl_state == REDIS_REPL_RECEIVE_PONG); 1414 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1415 close(fd); 1416 server.repl_transfer_s = -1; 1417 server.repl_state = REDIS_REPL_CONNECT; 1418 } 1419 1420 /* This function aborts a non blocking replication attempt if there is one 1421 * in progress, by canceling the non-blocking connect attempt or 1422 * the initial bulk transfer. 1423 * 1424 * If there was a replication handshake in progress 1 is returned and 1425 * the replication state (server.repl_state) set to REDIS_REPL_CONNECT. 1426 * 1427 * Otherwise zero is returned and no operation is perforemd at all. */ 1428 int cancelReplicationHandshake(void) { 1429 if (server.repl_state == REDIS_REPL_TRANSFER) { 1430 replicationAbortSyncTransfer(); 1431 } else if (server.repl_state == REDIS_REPL_CONNECTING || 1432 server.repl_state == REDIS_REPL_RECEIVE_PONG) 1433 { 1434 undoConnectWithMaster(); 1435 } else { 1436 return 0; 1437 } 1438 return 1; 1439 } 1440 1441 /* Set replication to the specified master address and port. */ 1442 void replicationSetMaster(char *ip, int port) { 1443 sdsfree(server.masterhost); 1444 server.masterhost = sdsnew(ip); 1445 server.masterport = port; 1446 if (server.master) freeClient(server.master); 1447 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1448 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1449 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1450 cancelReplicationHandshake(); 1451 server.repl_state = REDIS_REPL_CONNECT; 1452 server.master_repl_offset = 0; 1453 server.repl_down_since = 0; 1454 } 1455 1456 /* Cancel replication, setting the instance as a master itself. */ 1457 void replicationUnsetMaster(void) { 1458 if (server.masterhost == NULL) return; /* Nothing to do. */ 1459 sdsfree(server.masterhost); 1460 server.masterhost = NULL; 1461 if (server.master) { 1462 if (listLength(server.slaves) == 0) { 1463 /* If this instance is turned into a master and there are no 1464 * slaves, it inherits the replication offset from the master. 1465 * Under certain conditions this makes replicas comparable by 1466 * replication offset to understand what is the most updated. */ 1467 server.master_repl_offset = server.master->reploff; 1468 freeReplicationBacklog(); 1469 } 1470 freeClient(server.master); 1471 } 1472 replicationDiscardCachedMaster(); 1473 cancelReplicationHandshake(); 1474 server.repl_state = REDIS_REPL_NONE; 1475 } 1476 1477 void slaveofCommand(redisClient *c) { 1478 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1479 * configured using the current address of the master node. */ 1480 if (server.cluster_enabled) { 1481 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1482 return; 1483 } 1484 1485 /* The special host/port combination "NO" "ONE" turns the instance 1486 * into a master. Otherwise the new master address is set. */ 1487 if (!strcasecmp(c->argv[1]->ptr,"no") && 1488 !strcasecmp(c->argv[2]->ptr,"one")) { 1489 if (server.masterhost) { 1490 replicationUnsetMaster(); 1491 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); 1492 } 1493 } else { 1494 long port; 1495 1496 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) 1497 return; 1498 1499 /* Check if we are already attached to the specified slave */ 1500 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1501 && server.masterport == port) { 1502 redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1503 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1504 return; 1505 } 1506 /* There was no previous master or the user specified a different one, 1507 * we can continue. */ 1508 replicationSetMaster(c->argv[1]->ptr, port); 1509 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", 1510 server.masterhost, server.masterport); 1511 } 1512 addReply(c,shared.ok); 1513 } 1514 1515 /* ROLE command: provide information about the role of the instance 1516 * (master or slave) and additional information related to replication 1517 * in an easy to process format. */ 1518 void roleCommand(redisClient *c) { 1519 if (server.masterhost == NULL) { 1520 listIter li; 1521 listNode *ln; 1522 void *mbcount; 1523 int slaves = 0; 1524 1525 addReplyMultiBulkLen(c,3); 1526 addReplyBulkCBuffer(c,"master",6); 1527 addReplyLongLong(c,server.master_repl_offset); 1528 mbcount = addDeferredMultiBulkLength(c); 1529 listRewind(server.slaves,&li); 1530 while((ln = listNext(&li))) { 1531 redisClient *slave = ln->value; 1532 char ip[REDIS_IP_STR_LEN]; 1533 1534 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1535 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1536 addReplyMultiBulkLen(c,3); 1537 addReplyBulkCString(c,ip); 1538 addReplyBulkLongLong(c,slave->slave_listening_port); 1539 addReplyBulkLongLong(c,slave->repl_ack_off); 1540 slaves++; 1541 } 1542 setDeferredMultiBulkLength(c,mbcount,slaves); 1543 } else { 1544 char *slavestate = NULL; 1545 1546 addReplyMultiBulkLen(c,5); 1547 addReplyBulkCBuffer(c,"slave",5); 1548 addReplyBulkCString(c,server.masterhost); 1549 addReplyLongLong(c,server.masterport); 1550 switch(server.repl_state) { 1551 case REDIS_REPL_NONE: slavestate = "none"; break; 1552 case REDIS_REPL_CONNECT: slavestate = "connect"; break; 1553 case REDIS_REPL_CONNECTING: slavestate = "connecting"; break; 1554 case REDIS_REPL_RECEIVE_PONG: /* see next */ 1555 case REDIS_REPL_TRANSFER: slavestate = "sync"; break; 1556 case REDIS_REPL_CONNECTED: slavestate = "connected"; break; 1557 default: slavestate = "unknown"; break; 1558 } 1559 addReplyBulkCString(c,slavestate); 1560 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1561 } 1562 } 1563 1564 /* Send a REPLCONF ACK command to the master to inform it about the current 1565 * processed offset. If we are not connected with a master, the command has 1566 * no effects. */ 1567 void replicationSendAck(void) { 1568 redisClient *c = server.master; 1569 1570 if (c != NULL) { 1571 c->flags |= REDIS_MASTER_FORCE_REPLY; 1572 addReplyMultiBulkLen(c,3); 1573 addReplyBulkCString(c,"REPLCONF"); 1574 addReplyBulkCString(c,"ACK"); 1575 addReplyBulkLongLong(c,c->reploff); 1576 c->flags &= ~REDIS_MASTER_FORCE_REPLY; 1577 } 1578 } 1579 1580 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1581 1582 /* In order to implement partial synchronization we need to be able to cache 1583 * our master's client structure after a transient disconnection. 1584 * It is cached into server.cached_master and flushed away using the following 1585 * functions. */ 1586 1587 /* This function is called by freeClient() in order to cache the master 1588 * client structure instead of destryoing it. freeClient() will return 1589 * ASAP after this function returns, so every action needed to avoid problems 1590 * with a client that is really "suspended" has to be done by this function. 1591 * 1592 * The other functions that will deal with the cached master are: 1593 * 1594 * replicationDiscardCachedMaster() that will make sure to kill the client 1595 * as for some reason we don't want to use it in the future. 1596 * 1597 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1598 * handshake in order to reactivate the cached master. 1599 */ 1600 void replicationCacheMaster(redisClient *c) { 1601 listNode *ln; 1602 1603 redisAssert(server.master != NULL && server.cached_master == NULL); 1604 redisLog(REDIS_NOTICE,"Caching the disconnected master state."); 1605 1606 /* Remove from the list of clients, we don't want this client to be 1607 * listed by CLIENT LIST or processed in any way by batch operations. */ 1608 ln = listSearchKey(server.clients,c); 1609 redisAssert(ln != NULL); 1610 listDelNode(server.clients,ln); 1611 1612 /* Save the master. Server.master will be set to null later by 1613 * replicationHandleMasterDisconnection(). */ 1614 server.cached_master = server.master; 1615 1616 /* Remove the event handlers and close the socket. We'll later reuse 1617 * the socket of the new connection with the master during PSYNC. */ 1618 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 1619 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1620 close(c->fd); 1621 1622 /* Set fd to -1 so that we can safely call freeClient(c) later. */ 1623 c->fd = -1; 1624 1625 /* Invalidate the Peer ID cache. */ 1626 if (c->peerid) { 1627 sdsfree(c->peerid); 1628 c->peerid = NULL; 1629 } 1630 1631 /* Caching the master happens instead of the actual freeClient() call, 1632 * so make sure to adjust the replication state. This function will 1633 * also set server.master to NULL. */ 1634 replicationHandleMasterDisconnection(); 1635 } 1636 1637 /* Free a cached master, called when there are no longer the conditions for 1638 * a partial resync on reconnection. */ 1639 void replicationDiscardCachedMaster(void) { 1640 if (server.cached_master == NULL) return; 1641 1642 redisLog(REDIS_NOTICE,"Discarding previously cached master state."); 1643 server.cached_master->flags &= ~REDIS_MASTER; 1644 freeClient(server.cached_master); 1645 server.cached_master = NULL; 1646 } 1647 1648 /* Turn the cached master into the current master, using the file descriptor 1649 * passed as argument as the socket for the new master. 1650 * 1651 * This function is called when successfully setup a partial resynchronization 1652 * so the stream of data that we'll receive will start from were this 1653 * master left. */ 1654 void replicationResurrectCachedMaster(int newfd) { 1655 server.master = server.cached_master; 1656 server.cached_master = NULL; 1657 server.master->fd = newfd; 1658 server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); 1659 server.master->authenticated = 1; 1660 server.master->lastinteraction = server.unixtime; 1661 server.repl_state = REDIS_REPL_CONNECTED; 1662 1663 /* Re-add to the list of clients. */ 1664 listAddNodeTail(server.clients,server.master); 1665 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1666 readQueryFromClient, server.master)) { 1667 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1668 freeClientAsync(server.master); /* Close ASAP. */ 1669 } 1670 1671 /* We may also need to install the write handler as well if there is 1672 * pending data in the write buffers. */ 1673 if (server.master->bufpos || listLength(server.master->reply)) { 1674 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1675 sendReplyToClient, server.master)) { 1676 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1677 freeClientAsync(server.master); /* Close ASAP. */ 1678 } 1679 } 1680 } 1681 1682 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1683 1684 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1685 * If the option is active, the server will prevent writes if there are not 1686 * enough connected slaves with the specified lag (or less). */ 1687 void refreshGoodSlavesCount(void) { 1688 listIter li; 1689 listNode *ln; 1690 int good = 0; 1691 1692 if (!server.repl_min_slaves_to_write || 1693 !server.repl_min_slaves_max_lag) return; 1694 1695 listRewind(server.slaves,&li); 1696 while((ln = listNext(&li))) { 1697 redisClient *slave = ln->value; 1698 time_t lag = server.unixtime - slave->repl_ack_time; 1699 1700 if (slave->replstate == REDIS_REPL_ONLINE && 1701 lag <= server.repl_min_slaves_max_lag) good++; 1702 } 1703 server.repl_good_slaves_count = good; 1704 } 1705 1706 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1707 * The goal of this code is to keep track of scripts already sent to every 1708 * connected slave, in order to be able to replicate EVALSHA as it is without 1709 * translating it to EVAL every time it is possible. 1710 * 1711 * We use a capped collection implemented by a hash table for fast lookup 1712 * of scripts we can send as EVALSHA, plus a linked list that is used for 1713 * eviction of the oldest entry when the max number of items is reached. 1714 * 1715 * We don't care about taking a different cache for every different slave 1716 * since to fill the cache again is not very costly, the goal of this code 1717 * is to avoid that the same big script is trasmitted a big number of times 1718 * per second wasting bandwidth and processor speed, but it is not a problem 1719 * if we need to rebuild the cache from scratch from time to time, every used 1720 * script will need to be transmitted a single time to reappear in the cache. 1721 * 1722 * This is how the system works: 1723 * 1724 * 1) Every time a new slave connects, we flush the whole script cache. 1725 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1726 * trying to convert EVAL into EVALSHA specifically for slaves. 1727 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1728 * corresponding SHA1 of the script into the cache as we are sure every 1729 * slave knows about the script starting from now. 1730 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1731 * and at the same time flush the script cache. 1732 * 5) When the last slave disconnects, flush the cache. 1733 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1734 * in the master sometimes. 1735 */ 1736 1737 /* Initialize the script cache, only called at startup. */ 1738 void replicationScriptCacheInit(void) { 1739 server.repl_scriptcache_size = 10000; 1740 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1741 server.repl_scriptcache_fifo = listCreate(); 1742 } 1743 1744 /* Empty the script cache. Should be called every time we are no longer sure 1745 * that every slave knows about all the scripts in our set, or when the 1746 * current AOF "context" is no longer aware of the script. In general we 1747 * should flush the cache: 1748 * 1749 * 1) Every time a new slave reconnects to this master and performs a 1750 * full SYNC (PSYNC does not require flushing). 1751 * 2) Every time an AOF rewrite is performed. 1752 * 3) Every time we are left without slaves at all, and AOF is off, in order 1753 * to reclaim otherwise unused memory. 1754 */ 1755 void replicationScriptCacheFlush(void) { 1756 dictEmpty(server.repl_scriptcache_dict,NULL); 1757 listRelease(server.repl_scriptcache_fifo); 1758 server.repl_scriptcache_fifo = listCreate(); 1759 } 1760 1761 /* Add an entry into the script cache, if we reach max number of entries the 1762 * oldest is removed from the list. */ 1763 void replicationScriptCacheAdd(sds sha1) { 1764 int retval; 1765 sds key = sdsdup(sha1); 1766 1767 /* Evict oldest. */ 1768 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 1769 { 1770 listNode *ln = listLast(server.repl_scriptcache_fifo); 1771 sds oldest = listNodeValue(ln); 1772 1773 retval = dictDelete(server.repl_scriptcache_dict,oldest); 1774 redisAssert(retval == DICT_OK); 1775 listDelNode(server.repl_scriptcache_fifo,ln); 1776 } 1777 1778 /* Add current. */ 1779 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 1780 listAddNodeHead(server.repl_scriptcache_fifo,key); 1781 redisAssert(retval == DICT_OK); 1782 } 1783 1784 /* Returns non-zero if the specified entry exists inside the cache, that is, 1785 * if all the slaves are aware of this script SHA1. */ 1786 int replicationScriptCacheExists(sds sha1) { 1787 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 1788 } 1789 1790 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 1791 * Redis synchronous replication design can be summarized in points: 1792 * 1793 * - Redis masters have a global replication offset, used by PSYNC. 1794 * - Master increment the offset every time new commands are sent to slaves. 1795 * - Slaves ping back masters with the offset processed so far. 1796 * 1797 * So synchronous replication adds a new WAIT command in the form: 1798 * 1799 * WAIT <num_replicas> <milliseconds_timeout> 1800 * 1801 * That returns the number of replicas that processed the query when 1802 * we finally have at least num_replicas, or when the timeout was 1803 * reached. 1804 * 1805 * The command is implemented in this way: 1806 * 1807 * - Every time a client processes a command, we remember the replication 1808 * offset after sending that command to the slaves. 1809 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 1810 * The client is blocked at the same time (see blocked.c). 1811 * - Once we receive enough ACKs for a given offset or when the timeout 1812 * is reached, the WAIT command is unblocked and the reply sent to the 1813 * client. 1814 */ 1815 1816 /* This just set a flag so that we broadcast a REPLCONF GETACK command 1817 * to all the slaves in the beforeSleep() function. Note that this way 1818 * we "group" all the clients that want to wait for synchronouns replication 1819 * in a given event loop iteration, and send a single GETACK for them all. */ 1820 void replicationRequestAckFromSlaves(void) { 1821 server.get_ack_from_slaves = 1; 1822 } 1823 1824 /* Return the number of slaves that already acknowledged the specified 1825 * replication offset. */ 1826 int replicationCountAcksByOffset(long long offset) { 1827 listIter li; 1828 listNode *ln; 1829 int count = 0; 1830 1831 listRewind(server.slaves,&li); 1832 while((ln = listNext(&li))) { 1833 redisClient *slave = ln->value; 1834 1835 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1836 if (slave->repl_ack_off >= offset) count++; 1837 } 1838 return count; 1839 } 1840 1841 /* WAIT for N replicas to acknowledge the processing of our latest 1842 * write command (and all the previous commands). */ 1843 void waitCommand(redisClient *c) { 1844 mstime_t timeout; 1845 long numreplicas, ackreplicas; 1846 long long offset = c->woff; 1847 1848 /* Argument parsing. */ 1849 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK) 1850 return; 1851 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 1852 != REDIS_OK) return; 1853 1854 /* First try without blocking at all. */ 1855 ackreplicas = replicationCountAcksByOffset(c->woff); 1856 if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) { 1857 addReplyLongLong(c,ackreplicas); 1858 return; 1859 } 1860 1861 /* Otherwise block the client and put it into our list of clients 1862 * waiting for ack from slaves. */ 1863 c->bpop.timeout = timeout; 1864 c->bpop.reploffset = offset; 1865 c->bpop.numreplicas = numreplicas; 1866 listAddNodeTail(server.clients_waiting_acks,c); 1867 blockClient(c,REDIS_BLOCKED_WAIT); 1868 1869 /* Make sure that the server will send an ACK request to all the slaves 1870 * before returning to the event loop. */ 1871 replicationRequestAckFromSlaves(); 1872 } 1873 1874 /* This is called by unblockClient() to perform the blocking op type 1875 * specific cleanup. We just remove the client from the list of clients 1876 * waiting for replica acks. Never call it directly, call unblockClient() 1877 * instead. */ 1878 void unblockClientWaitingReplicas(redisClient *c) { 1879 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 1880 redisAssert(ln != NULL); 1881 listDelNode(server.clients_waiting_acks,ln); 1882 } 1883 1884 /* Check if there are clients blocked in WAIT that can be unblocked since 1885 * we received enough ACKs from slaves. */ 1886 void processClientsWaitingReplicas(void) { 1887 long long last_offset = 0; 1888 int last_numreplicas = 0; 1889 1890 listIter li; 1891 listNode *ln; 1892 1893 listRewind(server.clients_waiting_acks,&li); 1894 while((ln = listNext(&li))) { 1895 redisClient *c = ln->value; 1896 1897 /* Every time we find a client that is satisfied for a given 1898 * offset and number of replicas, we remember it so the next client 1899 * may be unblocked without calling replicationCountAcksByOffset() 1900 * if the requested offset / replicas were equal or less. */ 1901 if (last_offset && last_offset > c->bpop.reploffset && 1902 last_numreplicas > c->bpop.numreplicas) 1903 { 1904 unblockClient(c); 1905 addReplyLongLong(c,last_numreplicas); 1906 } else { 1907 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 1908 1909 if (numreplicas >= c->bpop.numreplicas) { 1910 last_offset = c->bpop.reploffset; 1911 last_numreplicas = numreplicas; 1912 unblockClient(c); 1913 addReplyLongLong(c,numreplicas); 1914 } 1915 } 1916 } 1917 } 1918 1919 /* Return the slave replication offset for this instance, that is 1920 * the offset for which we already processed the master replication stream. */ 1921 long long replicationGetSlaveOffset(void) { 1922 long long offset = 0; 1923 1924 if (server.masterhost != NULL) { 1925 if (server.master) { 1926 offset = server.master->reploff; 1927 } else if (server.cached_master) { 1928 offset = server.cached_master->reploff; 1929 } 1930 } 1931 /* offset may be -1 when the master does not support it at all, however 1932 * this function is designed to return an offset that can express the 1933 * amount of data processed by the master, so we return a positive 1934 * integer. */ 1935 if (offset < 0) offset = 0; 1936 return offset; 1937 } 1938 1939 /* --------------------------- REPLICATION CRON ---------------------------- */ 1940 1941 /* Replication cron function, called 1 time per second. */ 1942 void replicationCron(void) { 1943 /* Non blocking connection timeout? */ 1944 if (server.masterhost && 1945 (server.repl_state == REDIS_REPL_CONNECTING || 1946 server.repl_state == REDIS_REPL_RECEIVE_PONG) && 1947 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1948 { 1949 redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); 1950 undoConnectWithMaster(); 1951 } 1952 1953 /* Bulk transfer I/O timeout? */ 1954 if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER && 1955 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1956 { 1957 redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 1958 replicationAbortSyncTransfer(); 1959 } 1960 1961 /* Timed out master when we are an already connected slave? */ 1962 if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED && 1963 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 1964 { 1965 redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received..."); 1966 freeClient(server.master); 1967 } 1968 1969 /* Check if we should connect to a MASTER */ 1970 if (server.repl_state == REDIS_REPL_CONNECT) { 1971 redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", 1972 server.masterhost, server.masterport); 1973 if (connectWithMaster() == REDIS_OK) { 1974 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); 1975 } 1976 } 1977 1978 /* Send ACK to master from time to time. 1979 * Note that we do not send periodic acks to masters that don't 1980 * support PSYNC and replication offsets. */ 1981 if (server.masterhost && server.master && 1982 !(server.master->flags & REDIS_PRE_PSYNC)) 1983 replicationSendAck(); 1984 1985 /* If we have attached slaves, PING them from time to time. 1986 * So slaves can implement an explicit timeout to masters, and will 1987 * be able to detect a link disconnection even if the TCP connection 1988 * will not actually go down. */ 1989 if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { 1990 listIter li; 1991 listNode *ln; 1992 robj *ping_argv[1]; 1993 1994 /* First, send PING */ 1995 ping_argv[0] = createStringObject("PING",4); 1996 replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); 1997 decrRefCount(ping_argv[0]); 1998 1999 /* Second, send a newline to all the slaves in pre-synchronization 2000 * stage, that is, slaves waiting for the master to create the RDB file. 2001 * The newline will be ignored by the slave but will refresh the 2002 * last-io timer preventing a timeout. */ 2003 listRewind(server.slaves,&li); 2004 while((ln = listNext(&li))) { 2005 redisClient *slave = ln->value; 2006 2007 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || 2008 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && 2009 server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) 2010 { 2011 if (write(slave->fd, "\n", 1) == -1) { 2012 /* Don't worry, it's just a ping. */ 2013 } 2014 } 2015 } 2016 } 2017 2018 /* Disconnect timedout slaves. */ 2019 if (listLength(server.slaves)) { 2020 listIter li; 2021 listNode *ln; 2022 2023 listRewind(server.slaves,&li); 2024 while((ln = listNext(&li))) { 2025 redisClient *slave = ln->value; 2026 2027 if (slave->replstate != REDIS_REPL_ONLINE) continue; 2028 if (slave->flags & REDIS_PRE_PSYNC) continue; 2029 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2030 { 2031 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", 2032 replicationGetSlaveName(slave)); 2033 freeClient(slave); 2034 } 2035 } 2036 } 2037 2038 /* If we have no attached slaves and there is a replication backlog 2039 * using memory, free it after some (configured) time. */ 2040 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2041 server.repl_backlog) 2042 { 2043 time_t idle = server.unixtime - server.repl_no_slaves_since; 2044 2045 if (idle > server.repl_backlog_time_limit) { 2046 freeReplicationBacklog(); 2047 redisLog(REDIS_NOTICE, 2048 "Replication backlog freed after %d seconds " 2049 "without connected slaves.", 2050 (int) server.repl_backlog_time_limit); 2051 } 2052 } 2053 2054 /* If AOF is disabled and we no longer have attached slaves, we can 2055 * free our Replication Script Cache as there is no need to propagate 2056 * EVALSHA at all. */ 2057 if (listLength(server.slaves) == 0 && 2058 server.aof_state == REDIS_AOF_OFF && 2059 listLength(server.repl_scriptcache_fifo) != 0) 2060 { 2061 replicationScriptCacheFlush(); 2062 } 2063 2064 /* If we are using diskless replication and there are slaves waiting 2065 * in WAIT_BGSAVE_START state, check if enough seconds elapsed and 2066 * start a BGSAVE. 2067 * 2068 * This code is also useful to trigger a BGSAVE if the diskless 2069 * replication was turned off with CONFIG SET, while there were already 2070 * slaves in WAIT_BGSAVE_START state. */ 2071 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2072 time_t idle, max_idle = 0; 2073 int slaves_waiting = 0; 2074 listNode *ln; 2075 listIter li; 2076 2077 listRewind(server.slaves,&li); 2078 while((ln = listNext(&li))) { 2079 redisClient *slave = ln->value; 2080 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 2081 idle = server.unixtime - slave->lastinteraction; 2082 if (idle > max_idle) max_idle = idle; 2083 slaves_waiting++; 2084 } 2085 } 2086 2087 if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { 2088 /* Start a BGSAVE. Usually with socket target, or with disk target 2089 * if there was a recent socket -> disk config change. */ 2090 if (startBgsaveForReplication() == REDIS_OK) { 2091 /* It started! We need to change the state of slaves 2092 * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case 2093 * the current target is disk. Otherwise it was already done 2094 * by rdbSaveToSlavesSockets() which is called by 2095 * startBgsaveForReplication(). */ 2096 listRewind(server.slaves,&li); 2097 while((ln = listNext(&li))) { 2098 redisClient *slave = ln->value; 2099 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 2100 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 2101 } 2102 } 2103 } 2104 } 2105 2106 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2107 refreshGoodSlavesCount(); 2108 } 2109