1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "redis.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 void putSlaveOnline(redisClient *slave); 44 45 /* --------------------------- Utility functions ---------------------------- */ 46 47 /* Return the pointer to a string representing the slave ip:listening_port 48 * pair. Mostly useful for logging, since we want to log a slave using its 49 * IP address and it's listening port which is more clear for the user, for 50 * example: "Closing connection with slave 10.1.2.3:6380". */ 51 char *replicationGetSlaveName(redisClient *c) { 52 static char buf[REDIS_PEER_ID_LEN]; 53 char ip[REDIS_IP_STR_LEN]; 54 55 ip[0] = '\0'; 56 buf[0] = '\0'; 57 if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { 58 if (c->slave_listening_port) 59 anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); 60 else 61 snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip); 62 } else { 63 snprintf(buf,sizeof(buf),"client id #%llu", 64 (unsigned long long) c->id); 65 } 66 return buf; 67 } 68 69 /* ---------------------------------- MASTER -------------------------------- */ 70 71 void createReplicationBacklog(void) { 72 redisAssert(server.repl_backlog == NULL); 73 server.repl_backlog = zmalloc(server.repl_backlog_size); 74 server.repl_backlog_histlen = 0; 75 server.repl_backlog_idx = 0; 76 /* When a new backlog buffer is created, we increment the replication 77 * offset by one to make sure we'll not be able to PSYNC with any 78 * previous slave. This is needed because we avoid incrementing the 79 * master_repl_offset if no backlog exists nor slaves are attached. */ 80 server.master_repl_offset++; 81 82 /* We don't have any data inside our buffer, but virtually the first 83 * byte we have is the next byte that will be generated for the 84 * replication stream. */ 85 server.repl_backlog_off = server.master_repl_offset+1; 86 } 87 88 /* This function is called when the user modifies the replication backlog 89 * size at runtime. It is up to the function to both update the 90 * server.repl_backlog_size and to resize the buffer and setup it so that 91 * it contains the same data as the previous one (possibly less data, but 92 * the most recent bytes, or the same data and more free space in case the 93 * buffer is enlarged). */ 94 void resizeReplicationBacklog(long long newsize) { 95 if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE) 96 newsize = REDIS_REPL_BACKLOG_MIN_SIZE; 97 if (server.repl_backlog_size == newsize) return; 98 99 server.repl_backlog_size = newsize; 100 if (server.repl_backlog != NULL) { 101 /* What we actually do is to flush the old buffer and realloc a new 102 * empty one. It will refill with new data incrementally. 103 * The reason is that copying a few gigabytes adds latency and even 104 * worse often we need to alloc additional space before freeing the 105 * old buffer. */ 106 zfree(server.repl_backlog); 107 server.repl_backlog = zmalloc(server.repl_backlog_size); 108 server.repl_backlog_histlen = 0; 109 server.repl_backlog_idx = 0; 110 /* Next byte we have is... the next since the buffer is empty. */ 111 server.repl_backlog_off = server.master_repl_offset+1; 112 } 113 } 114 115 void freeReplicationBacklog(void) { 116 redisAssert(listLength(server.slaves) == 0); 117 zfree(server.repl_backlog); 118 server.repl_backlog = NULL; 119 } 120 121 /* Add data to the replication backlog. 122 * This function also increments the global replication offset stored at 123 * server.master_repl_offset, because there is no case where we want to feed 124 * the backlog without incrementing the buffer. */ 125 void feedReplicationBacklog(void *ptr, size_t len) { 126 unsigned char *p = ptr; 127 128 server.master_repl_offset += len; 129 130 /* This is a circular buffer, so write as much data we can at every 131 * iteration and rewind the "idx" index if we reach the limit. */ 132 while(len) { 133 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 134 if (thislen > len) thislen = len; 135 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 136 server.repl_backlog_idx += thislen; 137 if (server.repl_backlog_idx == server.repl_backlog_size) 138 server.repl_backlog_idx = 0; 139 len -= thislen; 140 p += thislen; 141 server.repl_backlog_histlen += thislen; 142 } 143 if (server.repl_backlog_histlen > server.repl_backlog_size) 144 server.repl_backlog_histlen = server.repl_backlog_size; 145 /* Set the offset of the first byte we have in the backlog. */ 146 server.repl_backlog_off = server.master_repl_offset - 147 server.repl_backlog_histlen + 1; 148 } 149 150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 151 * as input. */ 152 void feedReplicationBacklogWithObject(robj *o) { 153 char llstr[REDIS_LONGSTR_SIZE]; 154 void *p; 155 size_t len; 156 157 if (o->encoding == REDIS_ENCODING_INT) { 158 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 159 p = llstr; 160 } else { 161 len = sdslen(o->ptr); 162 p = o->ptr; 163 } 164 feedReplicationBacklog(p,len); 165 } 166 167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 168 listNode *ln; 169 listIter li; 170 int j, len; 171 char llstr[REDIS_LONGSTR_SIZE]; 172 173 /* If there aren't slaves, and there is no backlog buffer to populate, 174 * we can return ASAP. */ 175 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 176 177 /* We can't have slaves attached and no backlog. */ 178 redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 179 180 /* Send SELECT command to every slave if needed. */ 181 if (server.slaveseldb != dictid) { 182 robj *selectcmd; 183 184 /* For a few DBs we have pre-computed SELECT command. */ 185 if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { 186 selectcmd = shared.select[dictid]; 187 } else { 188 int dictid_len; 189 190 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 191 selectcmd = createObject(REDIS_STRING, 192 sdscatprintf(sdsempty(), 193 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 194 dictid_len, llstr)); 195 } 196 197 /* Add the SELECT command into the backlog. */ 198 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 199 200 /* Send it to slaves. */ 201 listRewind(slaves,&li); 202 while((ln = listNext(&li))) { 203 redisClient *slave = ln->value; 204 addReply(slave,selectcmd); 205 } 206 207 if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) 208 decrRefCount(selectcmd); 209 } 210 server.slaveseldb = dictid; 211 212 /* Write the command to the replication backlog if any. */ 213 if (server.repl_backlog) { 214 char aux[REDIS_LONGSTR_SIZE+3]; 215 216 /* Add the multi bulk reply length. */ 217 aux[0] = '*'; 218 len = ll2string(aux+1,sizeof(aux)-1,argc); 219 aux[len+1] = '\r'; 220 aux[len+2] = '\n'; 221 feedReplicationBacklog(aux,len+3); 222 223 for (j = 0; j < argc; j++) { 224 long objlen = stringObjectLen(argv[j]); 225 226 /* We need to feed the buffer with the object as a bulk reply 227 * not just as a plain string, so create the $..CRLF payload len 228 * and add the final CRLF */ 229 aux[0] = '$'; 230 len = ll2string(aux+1,sizeof(aux)-1,objlen); 231 aux[len+1] = '\r'; 232 aux[len+2] = '\n'; 233 feedReplicationBacklog(aux,len+3); 234 feedReplicationBacklogWithObject(argv[j]); 235 feedReplicationBacklog(aux+len+1,2); 236 } 237 } 238 239 /* Write the command to every slave. */ 240 listRewind(server.slaves,&li); 241 while((ln = listNext(&li))) { 242 redisClient *slave = ln->value; 243 244 /* Don't feed slaves that are still waiting for BGSAVE to start */ 245 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; 246 247 /* Feed slaves that are waiting for the initial SYNC (so these commands 248 * are queued in the output buffer until the initial SYNC completes), 249 * or are already in sync with the master. */ 250 251 /* Add the multi bulk length. */ 252 addReplyMultiBulkLen(slave,argc); 253 254 /* Finally any additional argument that was not stored inside the 255 * static buffer if any (from j to argc). */ 256 for (j = 0; j < argc; j++) 257 addReplyBulk(slave,argv[j]); 258 } 259 } 260 261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { 262 listNode *ln; 263 listIter li; 264 int j; 265 sds cmdrepr = sdsnew("+"); 266 robj *cmdobj; 267 struct timeval tv; 268 269 gettimeofday(&tv,NULL); 270 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 271 if (c->flags & REDIS_LUA_CLIENT) { 272 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 273 } else if (c->flags & REDIS_UNIX_SOCKET) { 274 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 275 } else { 276 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 277 } 278 279 for (j = 0; j < argc; j++) { 280 if (argv[j]->encoding == REDIS_ENCODING_INT) { 281 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 282 } else { 283 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 284 sdslen(argv[j]->ptr)); 285 } 286 if (j != argc-1) 287 cmdrepr = sdscatlen(cmdrepr," ",1); 288 } 289 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 290 cmdobj = createObject(REDIS_STRING,cmdrepr); 291 292 listRewind(monitors,&li); 293 while((ln = listNext(&li))) { 294 redisClient *monitor = ln->value; 295 addReply(monitor,cmdobj); 296 } 297 decrRefCount(cmdobj); 298 } 299 300 /* Feed the slave 'c' with the replication backlog starting from the 301 * specified 'offset' up to the end of the backlog. */ 302 long long addReplyReplicationBacklog(redisClient *c, long long offset) { 303 long long j, skip, len; 304 305 redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 306 307 if (server.repl_backlog_histlen == 0) { 308 redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero"); 309 return 0; 310 } 311 312 redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld", 313 server.repl_backlog_size); 314 redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld", 315 server.repl_backlog_off); 316 redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld", 317 server.repl_backlog_histlen); 318 redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld", 319 server.repl_backlog_idx); 320 321 /* Compute the amount of bytes we need to discard. */ 322 skip = offset - server.repl_backlog_off; 323 redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip); 324 325 /* Point j to the oldest byte, that is actaully our 326 * server.repl_backlog_off byte. */ 327 j = (server.repl_backlog_idx + 328 (server.repl_backlog_size-server.repl_backlog_histlen)) % 329 server.repl_backlog_size; 330 redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j); 331 332 /* Discard the amount of data to seek to the specified 'offset'. */ 333 j = (j + skip) % server.repl_backlog_size; 334 335 /* Feed slave with data. Since it is a circular buffer we have to 336 * split the reply in two parts if we are cross-boundary. */ 337 len = server.repl_backlog_histlen - skip; 338 redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len); 339 while(len) { 340 long long thislen = 341 ((server.repl_backlog_size - j) < len) ? 342 (server.repl_backlog_size - j) : len; 343 344 redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 345 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 346 len -= thislen; 347 j = 0; 348 } 349 return server.repl_backlog_histlen - skip; 350 } 351 352 /* This function handles the PSYNC command from the point of view of a 353 * master receiving a request for partial resynchronization. 354 * 355 * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed 356 * with the usual full resync. */ 357 int masterTryPartialResynchronization(redisClient *c) { 358 long long psync_offset, psync_len; 359 char *master_runid = c->argv[1]->ptr; 360 char buf[128]; 361 int buflen; 362 363 /* Is the runid of this master the same advertised by the wannabe slave 364 * via PSYNC? If runid changed this master is a different instance and 365 * there is no way to continue. */ 366 if (strcasecmp(master_runid, server.runid)) { 367 /* Run id "?" is used by slaves that want to force a full resync. */ 368 if (master_runid[0] != '?') { 369 redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " 370 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 371 master_runid, server.runid); 372 } else { 373 redisLog(REDIS_NOTICE,"Full resync requested by slave %s", 374 replicationGetSlaveName(c)); 375 } 376 goto need_full_resync; 377 } 378 379 /* We still have the data our slave is asking for? */ 380 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 381 REDIS_OK) goto need_full_resync; 382 if (!server.repl_backlog || 383 psync_offset < server.repl_backlog_off || 384 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 385 { 386 redisLog(REDIS_NOTICE, 387 "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); 388 if (psync_offset > server.master_repl_offset) { 389 redisLog(REDIS_WARNING, 390 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 391 } 392 goto need_full_resync; 393 } 394 395 /* If we reached this point, we are able to perform a partial resync: 396 * 1) Set client state to make it a slave. 397 * 2) Inform the client we can continue with +CONTINUE 398 * 3) Send the backlog data (from the offset to the end) to the slave. */ 399 c->flags |= REDIS_SLAVE; 400 c->replstate = REDIS_REPL_ONLINE; 401 c->repl_ack_time = server.unixtime; 402 c->repl_put_online_on_ack = 0; 403 listAddNodeTail(server.slaves,c); 404 /* We can't use the connection buffers since they are used to accumulate 405 * new commands at this stage. But we are sure the socket send buffer is 406 * empty so this write will never fail actually. */ 407 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 408 if (write(c->fd,buf,buflen) != buflen) { 409 freeClientAsync(c); 410 return REDIS_OK; 411 } 412 psync_len = addReplyReplicationBacklog(c,psync_offset); 413 redisLog(REDIS_NOTICE, 414 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 415 replicationGetSlaveName(c), 416 psync_len, psync_offset); 417 /* Note that we don't need to set the selected DB at server.slaveseldb 418 * to -1 to force the master to emit SELECT, since the slave already 419 * has this state from the previous connection with the master. */ 420 421 refreshGoodSlavesCount(); 422 return REDIS_OK; /* The caller can return, no full resync needed. */ 423 424 need_full_resync: 425 /* We need a full resync for some reason... notify the client. */ 426 psync_offset = server.master_repl_offset; 427 /* Add 1 to psync_offset if it the replication backlog does not exists 428 * as when it will be created later we'll increment the offset by one. */ 429 if (server.repl_backlog == NULL) psync_offset++; 430 /* Again, we can't use the connection buffers (see above). */ 431 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 432 server.runid,psync_offset); 433 if (write(c->fd,buf,buflen) != buflen) { 434 freeClientAsync(c); 435 return REDIS_OK; 436 } 437 return REDIS_ERR; 438 } 439 440 /* Start a BGSAVE for replication goals, which is, selecting the disk or 441 * socket target depending on the configuration, and making sure that 442 * the script cache is flushed before to start. 443 * 444 * Returns REDIS_OK on success or REDIS_ERR otherwise. */ 445 int startBgsaveForReplication(void) { 446 int retval; 447 448 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", 449 server.repl_diskless_sync ? "slaves sockets" : "disk"); 450 451 if (server.repl_diskless_sync) 452 retval = rdbSaveToSlavesSockets(); 453 else 454 retval = rdbSaveBackground(server.rdb_filename); 455 456 /* Flush the script cache, since we need that slave differences are 457 * accumulated without requiring slaves to match our cached scripts. */ 458 if (retval == REDIS_OK) replicationScriptCacheFlush(); 459 return retval; 460 } 461 462 /* SYNC and PSYNC command implemenation. */ 463 void syncCommand(redisClient *c) { 464 /* ignore SYNC if already slave or in monitor mode */ 465 if (c->flags & REDIS_SLAVE) return; 466 467 /* Refuse SYNC requests if we are a slave but the link with our master 468 * is not ok... */ 469 if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { 470 addReplyError(c,"Can't SYNC while not connected with my master"); 471 return; 472 } 473 474 /* SYNC can't be issued when the server has pending data to send to 475 * the client about already issued commands. We need a fresh reply 476 * buffer registering the differences between the BGSAVE and the current 477 * dataset, so that we can copy to other slaves if needed. */ 478 if (listLength(c->reply) != 0 || c->bufpos != 0) { 479 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 480 return; 481 } 482 483 redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", 484 replicationGetSlaveName(c)); 485 486 /* Try a partial resynchronization if this is a PSYNC command. 487 * If it fails, we continue with usual full resynchronization, however 488 * when this happens masterTryPartialResynchronization() already 489 * replied with: 490 * 491 * +FULLRESYNC <runid> <offset> 492 * 493 * So the slave knows the new runid and offset to try a PSYNC later 494 * if the connection with the master is lost. */ 495 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 496 if (masterTryPartialResynchronization(c) == REDIS_OK) { 497 server.stat_sync_partial_ok++; 498 return; /* No full resync needed, return. */ 499 } else { 500 char *master_runid = c->argv[1]->ptr; 501 502 /* Increment stats for failed PSYNCs, but only if the 503 * runid is not "?", as this is used by slaves to force a full 504 * resync on purpose when they are not albe to partially 505 * resync. */ 506 if (master_runid[0] != '?') server.stat_sync_partial_err++; 507 } 508 } else { 509 /* If a slave uses SYNC, we are dealing with an old implementation 510 * of the replication protocol (like redis-cli --slave). Flag the client 511 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 512 c->flags |= REDIS_PRE_PSYNC; 513 } 514 515 /* Full resynchronization. */ 516 server.stat_sync_full++; 517 518 /* Here we need to check if there is a background saving operation 519 * in progress, or if it is required to start one */ 520 if (server.rdb_child_pid != -1 && 521 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) 522 { 523 /* Ok a background save is in progress. Let's check if it is a good 524 * one for replication, i.e. if there is another slave that is 525 * registering differences since the server forked to save. */ 526 redisClient *slave; 527 listNode *ln; 528 listIter li; 529 530 listRewind(server.slaves,&li); 531 while((ln = listNext(&li))) { 532 slave = ln->value; 533 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; 534 } 535 if (ln) { 536 /* Perfect, the server is already registering differences for 537 * another slave. Set the right state, and copy the buffer. */ 538 copyClientOutputBuffer(c,slave); 539 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 540 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); 541 } else { 542 /* No way, we need to wait for the next BGSAVE in order to 543 * register differences. */ 544 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 545 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 546 } 547 } else if (server.rdb_child_pid != -1 && 548 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) 549 { 550 /* There is an RDB child process but it is writing directly to 551 * children sockets. We need to wait for the next BGSAVE 552 * in order to synchronize. */ 553 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 554 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 555 } else { 556 if (server.repl_diskless_sync) { 557 /* Diskless replication RDB child is created inside 558 * replicationCron() since we want to delay its start a 559 * few seconds to wait for more slaves to arrive. */ 560 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 561 if (server.repl_diskless_sync_delay) 562 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); 563 } else { 564 /* Ok we don't have a BGSAVE in progress, let's start one. */ 565 if (startBgsaveForReplication() != REDIS_OK) { 566 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); 567 addReplyError(c,"Unable to perform background save"); 568 return; 569 } 570 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 571 } 572 } 573 574 if (server.repl_disable_tcp_nodelay) 575 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 576 c->repldbfd = -1; 577 c->flags |= REDIS_SLAVE; 578 server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ 579 listAddNodeTail(server.slaves,c); 580 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 581 createReplicationBacklog(); 582 return; 583 } 584 585 /* REPLCONF <option> <value> <option> <value> ... 586 * This command is used by a slave in order to configure the replication 587 * process before starting it with the SYNC command. 588 * 589 * Currently the only use of this command is to communicate to the master 590 * what is the listening port of the Slave redis instance, so that the 591 * master can accurately list slaves and their listening ports in 592 * the INFO output. 593 * 594 * In the future the same command can be used in order to configure 595 * the replication to initiate an incremental replication instead of a 596 * full resync. */ 597 void replconfCommand(redisClient *c) { 598 int j; 599 600 if ((c->argc % 2) == 0) { 601 /* Number of arguments must be odd to make sure that every 602 * option has a corresponding value. */ 603 addReply(c,shared.syntaxerr); 604 return; 605 } 606 607 /* Process every option-value pair. */ 608 for (j = 1; j < c->argc; j+=2) { 609 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 610 long port; 611 612 if ((getLongFromObjectOrReply(c,c->argv[j+1], 613 &port,NULL) != REDIS_OK)) 614 return; 615 c->slave_listening_port = port; 616 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 617 /* REPLCONF ACK is used by slave to inform the master the amount 618 * of replication stream that it processed so far. It is an 619 * internal only command that normal clients should never use. */ 620 long long offset; 621 622 if (!(c->flags & REDIS_SLAVE)) return; 623 if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK)) 624 return; 625 if (offset > c->repl_ack_off) 626 c->repl_ack_off = offset; 627 c->repl_ack_time = server.unixtime; 628 /* If this was a diskless replication, we need to really put 629 * the slave online when the first ACK is received (which 630 * confirms slave is online and ready to get more data). */ 631 if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE) 632 putSlaveOnline(c); 633 /* Note: this command does not reply anything! */ 634 return; 635 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 636 /* REPLCONF GETACK is used in order to request an ACK ASAP 637 * to the slave. */ 638 if (server.masterhost && server.master) replicationSendAck(); 639 /* Note: this command does not reply anything! */ 640 } else { 641 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 642 (char*)c->argv[j]->ptr); 643 return; 644 } 645 } 646 addReply(c,shared.ok); 647 } 648 649 /* This function puts a slave in the online state, and should be called just 650 * after a slave received the RDB file for the initial synchronization, and 651 * we are finally ready to send the incremental stream of commands. 652 * 653 * It does a few things: 654 * 655 * 1) Put the slave in ONLINE state. 656 * 2) Make sure the writable event is re-installed, since calling the SYNC 657 * command disables it, so that we can accumulate output buffer without 658 * sending it to the slave. 659 * 3) Update the count of good slaves. */ 660 void putSlaveOnline(redisClient *slave) { 661 slave->replstate = REDIS_REPL_ONLINE; 662 slave->repl_put_online_on_ack = 0; 663 slave->repl_ack_time = server.unixtime; 664 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 665 sendReplyToClient, slave) == AE_ERR) { 666 redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 667 freeClient(slave); 668 return; 669 } 670 refreshGoodSlavesCount(); 671 redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded", 672 replicationGetSlaveName(slave)); 673 } 674 675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 676 redisClient *slave = privdata; 677 REDIS_NOTUSED(el); 678 REDIS_NOTUSED(mask); 679 char buf[REDIS_IOBUF_LEN]; 680 ssize_t nwritten, buflen; 681 682 /* Before sending the RDB file, we send the preamble as configured by the 683 * replication process. Currently the preamble is just the bulk count of 684 * the file in the form "$<length>\r\n". */ 685 if (slave->replpreamble) { 686 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 687 if (nwritten == -1) { 688 redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", 689 strerror(errno)); 690 freeClient(slave); 691 return; 692 } 693 server.stat_net_output_bytes += nwritten; 694 sdsrange(slave->replpreamble,nwritten,-1); 695 if (sdslen(slave->replpreamble) == 0) { 696 sdsfree(slave->replpreamble); 697 slave->replpreamble = NULL; 698 /* fall through sending data. */ 699 } else { 700 return; 701 } 702 } 703 704 /* If the preamble was already transfered, send the RDB bulk data. */ 705 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 706 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 707 if (buflen <= 0) { 708 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", 709 (buflen == 0) ? "premature EOF" : strerror(errno)); 710 freeClient(slave); 711 return; 712 } 713 if ((nwritten = write(fd,buf,buflen)) == -1) { 714 if (errno != EAGAIN) { 715 redisLog(REDIS_WARNING,"Write error sending DB to slave: %s", 716 strerror(errno)); 717 freeClient(slave); 718 } 719 return; 720 } 721 slave->repldboff += nwritten; 722 server.stat_net_output_bytes += nwritten; 723 if (slave->repldboff == slave->repldbsize) { 724 close(slave->repldbfd); 725 slave->repldbfd = -1; 726 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 727 putSlaveOnline(slave); 728 } 729 } 730 731 /* This function is called at the end of every background saving, 732 * or when the replication RDB transfer strategy is modified from 733 * disk to socket or the other way around. 734 * 735 * The goal of this function is to handle slaves waiting for a successful 736 * background saving in order to perform non-blocking synchronization, and 737 * to schedule a new BGSAVE if there are slaves that attached while a 738 * BGSAVE was in progress, but it was not a good one for replication (no 739 * other slave was accumulating differences). 740 * 741 * The argument bgsaveerr is REDIS_OK if the background saving succeeded 742 * otherwise REDIS_ERR is passed to the function. 743 * The 'type' argument is the type of the child that terminated 744 * (if it had a disk or socket target). */ 745 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 746 listNode *ln; 747 int startbgsave = 0; 748 listIter li; 749 750 listRewind(server.slaves,&li); 751 while((ln = listNext(&li))) { 752 redisClient *slave = ln->value; 753 754 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 755 startbgsave = 1; 756 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 757 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { 758 struct redis_stat buf; 759 760 /* If this was an RDB on disk save, we have to prepare to send 761 * the RDB from disk to the slave socket. Otherwise if this was 762 * already an RDB -> Slaves socket transfer, used in the case of 763 * diskless replication, our work is trivial, we can just put 764 * the slave online. */ 765 if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { 766 redisLog(REDIS_NOTICE, 767 "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 768 replicationGetSlaveName(slave)); 769 /* Note: we wait for a REPLCONF ACK message from slave in 770 * order to really put it online (install the write handler 771 * so that the accumulated data can be transfered). However 772 * we change the replication state ASAP, since our slave 773 * is technically online now. */ 774 slave->replstate = REDIS_REPL_ONLINE; 775 slave->repl_put_online_on_ack = 1; 776 } else { 777 if (bgsaveerr != REDIS_OK) { 778 freeClient(slave); 779 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); 780 continue; 781 } 782 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 783 redis_fstat(slave->repldbfd,&buf) == -1) { 784 freeClient(slave); 785 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 786 continue; 787 } 788 slave->repldboff = 0; 789 slave->repldbsize = buf.st_size; 790 slave->replstate = REDIS_REPL_SEND_BULK; 791 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 792 (unsigned long long) slave->repldbsize); 793 794 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 795 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 796 freeClient(slave); 797 continue; 798 } 799 } 800 } 801 } 802 if (startbgsave) { 803 if (startBgsaveForReplication() != REDIS_OK) { 804 listIter li; 805 806 listRewind(server.slaves,&li); 807 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); 808 while((ln = listNext(&li))) { 809 redisClient *slave = ln->value; 810 811 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 812 freeClient(slave); 813 } 814 } 815 } 816 } 817 818 /* ----------------------------------- SLAVE -------------------------------- */ 819 820 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 821 void replicationAbortSyncTransfer(void) { 822 redisAssert(server.repl_state == REDIS_REPL_TRANSFER); 823 824 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 825 close(server.repl_transfer_s); 826 close(server.repl_transfer_fd); 827 unlink(server.repl_transfer_tmpfile); 828 zfree(server.repl_transfer_tmpfile); 829 server.repl_state = REDIS_REPL_CONNECT; 830 } 831 832 /* Avoid the master to detect the slave is timing out while loading the 833 * RDB file in initial synchronization. We send a single newline character 834 * that is valid protocol but is guaranteed to either be sent entierly or 835 * not, since the byte is indivisible. 836 * 837 * The function is called in two contexts: while we flush the current 838 * data with emptyDb(), and while we load the new data received as an 839 * RDB file from the master. */ 840 void replicationSendNewlineToMaster(void) { 841 static time_t newline_sent; 842 if (time(NULL) != newline_sent) { 843 newline_sent = time(NULL); 844 if (write(server.repl_transfer_s,"\n",1) == -1) { 845 /* Pinging back in this stage is best-effort. */ 846 } 847 } 848 } 849 850 /* Callback used by emptyDb() while flushing away old data to load 851 * the new dataset received by the master. */ 852 void replicationEmptyDbCallback(void *privdata) { 853 REDIS_NOTUSED(privdata); 854 replicationSendNewlineToMaster(); 855 } 856 857 /* Asynchronously read the SYNC payload we receive from a master */ 858 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 859 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 860 char buf[4096]; 861 ssize_t nread, readlen; 862 off_t left; 863 REDIS_NOTUSED(el); 864 REDIS_NOTUSED(privdata); 865 REDIS_NOTUSED(mask); 866 867 /* Static vars used to hold the EOF mark, and the last bytes received 868 * form the server: when they match, we reached the end of the transfer. */ 869 static char eofmark[REDIS_RUN_ID_SIZE]; 870 static char lastbytes[REDIS_RUN_ID_SIZE]; 871 static int usemark = 0; 872 873 /* If repl_transfer_size == -1 we still have to read the bulk length 874 * from the master reply. */ 875 if (server.repl_transfer_size == -1) { 876 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 877 redisLog(REDIS_WARNING, 878 "I/O error reading bulk count from MASTER: %s", 879 strerror(errno)); 880 goto error; 881 } 882 883 if (buf[0] == '-') { 884 redisLog(REDIS_WARNING, 885 "MASTER aborted replication with an error: %s", 886 buf+1); 887 goto error; 888 } else if (buf[0] == '\0') { 889 /* At this stage just a newline works as a PING in order to take 890 * the connection live. So we refresh our last interaction 891 * timestamp. */ 892 server.repl_transfer_lastio = server.unixtime; 893 return; 894 } else if (buf[0] != '$') { 895 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 896 goto error; 897 } 898 899 /* There are two possible forms for the bulk payload. One is the 900 * usual $<count> bulk format. The other is used for diskless transfers 901 * when the master does not know beforehand the size of the file to 902 * transfer. In the latter case, the following format is used: 903 * 904 * $EOF:<40 bytes delimiter> 905 * 906 * At the end of the file the announced delimiter is transmitted. The 907 * delimiter is long and random enough that the probability of a 908 * collision with the actual file content can be ignored. */ 909 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { 910 usemark = 1; 911 memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); 912 memset(lastbytes,0,REDIS_RUN_ID_SIZE); 913 /* Set any repl_transfer_size to avoid entering this code path 914 * at the next call. */ 915 server.repl_transfer_size = 0; 916 redisLog(REDIS_NOTICE, 917 "MASTER <-> SLAVE sync: receiving streamed RDB from master"); 918 } else { 919 usemark = 0; 920 server.repl_transfer_size = strtol(buf+1,NULL,10); 921 redisLog(REDIS_NOTICE, 922 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 923 (long long) server.repl_transfer_size); 924 } 925 return; 926 } 927 928 /* Read bulk data */ 929 if (usemark) { 930 readlen = sizeof(buf); 931 } else { 932 left = server.repl_transfer_size - server.repl_transfer_read; 933 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 934 } 935 936 nread = read(fd,buf,readlen); 937 if (nread <= 0) { 938 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", 939 (nread == -1) ? strerror(errno) : "connection lost"); 940 replicationAbortSyncTransfer(); 941 return; 942 } 943 server.stat_net_input_bytes += nread; 944 945 /* When a mark is used, we want to detect EOF asap in order to avoid 946 * writing the EOF mark into the file... */ 947 int eof_reached = 0; 948 949 if (usemark) { 950 /* Update the last bytes array, and check if it matches our delimiter.*/ 951 if (nread >= REDIS_RUN_ID_SIZE) { 952 memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); 953 } else { 954 int rem = REDIS_RUN_ID_SIZE-nread; 955 memmove(lastbytes,lastbytes+nread,rem); 956 memcpy(lastbytes+rem,buf,nread); 957 } 958 if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; 959 } 960 961 server.repl_transfer_lastio = server.unixtime; 962 if (write(server.repl_transfer_fd,buf,nread) != nread) { 963 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 964 goto error; 965 } 966 server.repl_transfer_read += nread; 967 968 /* Delete the last 40 bytes from the file if we reached EOF. */ 969 if (usemark && eof_reached) { 970 if (ftruncate(server.repl_transfer_fd, 971 server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1) 972 { 973 redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 974 goto error; 975 } 976 } 977 978 /* Sync data on disk from time to time, otherwise at the end of the transfer 979 * we may suffer a big delay as the memory buffers are copied into the 980 * actual disk. */ 981 if (server.repl_transfer_read >= 982 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 983 { 984 off_t sync_size = server.repl_transfer_read - 985 server.repl_transfer_last_fsync_off; 986 rdb_fsync_range(server.repl_transfer_fd, 987 server.repl_transfer_last_fsync_off, sync_size); 988 server.repl_transfer_last_fsync_off += sync_size; 989 } 990 991 /* Check if the transfer is now complete */ 992 if (!usemark) { 993 if (server.repl_transfer_read == server.repl_transfer_size) 994 eof_reached = 1; 995 } 996 997 if (eof_reached) { 998 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 999 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 1000 replicationAbortSyncTransfer(); 1001 return; 1002 } 1003 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 1004 signalFlushedDb(-1); 1005 emptyDb(replicationEmptyDbCallback); 1006 /* Before loading the DB into memory we need to delete the readable 1007 * handler, otherwise it will get called recursively since 1008 * rdbLoad() will call the event loop to process events from time to 1009 * time for non blocking loading. */ 1010 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1011 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 1012 if (rdbLoad(server.rdb_filename) != REDIS_OK) { 1013 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1014 replicationAbortSyncTransfer(); 1015 return; 1016 } 1017 /* Final setup of the connected slave <- master link */ 1018 zfree(server.repl_transfer_tmpfile); 1019 close(server.repl_transfer_fd); 1020 server.master = createClient(server.repl_transfer_s); 1021 server.master->flags |= REDIS_MASTER; 1022 server.master->authenticated = 1; 1023 server.repl_state = REDIS_REPL_CONNECTED; 1024 server.master->reploff = server.repl_master_initial_offset; 1025 memcpy(server.master->replrunid, server.repl_master_runid, 1026 sizeof(server.repl_master_runid)); 1027 /* If master offset is set to -1, this master is old and is not 1028 * PSYNC capable, so we flag it accordingly. */ 1029 if (server.master->reploff == -1) 1030 server.master->flags |= REDIS_PRE_PSYNC; 1031 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 1032 /* Restart the AOF subsystem now that we finished the sync. This 1033 * will trigger an AOF rewrite, and when done will start appending 1034 * to the new file. */ 1035 if (server.aof_state != REDIS_AOF_OFF) { 1036 int retry = 10; 1037 1038 stopAppendOnly(); 1039 while (retry-- && startAppendOnly() == REDIS_ERR) { 1040 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 1041 sleep(1); 1042 } 1043 if (!retry) { 1044 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 1045 exit(1); 1046 } 1047 } 1048 } 1049 1050 return; 1051 1052 error: 1053 replicationAbortSyncTransfer(); 1054 return; 1055 } 1056 1057 /* Send a synchronous command to the master. Used to send AUTH and 1058 * REPLCONF commands before starting the replication with SYNC. 1059 * 1060 * The command returns an sds string representing the result of the 1061 * operation. On error the first byte is a "-". 1062 */ 1063 char *sendSynchronousCommand(int fd, ...) { 1064 va_list ap; 1065 sds cmd = sdsempty(); 1066 char *arg, buf[256]; 1067 1068 /* Create the command to send to the master, we use simple inline 1069 * protocol for simplicity as currently we only send simple strings. */ 1070 va_start(ap,fd); 1071 while(1) { 1072 arg = va_arg(ap, char*); 1073 if (arg == NULL) break; 1074 1075 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 1076 cmd = sdscat(cmd,arg); 1077 } 1078 cmd = sdscatlen(cmd,"\r\n",2); 1079 1080 /* Transfer command to the server. */ 1081 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { 1082 sdsfree(cmd); 1083 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1084 strerror(errno)); 1085 } 1086 sdsfree(cmd); 1087 1088 /* Read the reply from the server. */ 1089 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) 1090 { 1091 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1092 strerror(errno)); 1093 } 1094 return sdsnew(buf); 1095 } 1096 1097 /* Try a partial resynchronization with the master if we are about to reconnect. 1098 * If there is no cached master structure, at least try to issue a 1099 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1100 * command in order to obtain the master run id and the master replication 1101 * global offset. 1102 * 1103 * This function is designed to be called from syncWithMaster(), so the 1104 * following assumptions are made: 1105 * 1106 * 1) We pass the function an already connected socket "fd". 1107 * 2) This function does not close the file descriptor "fd". However in case 1108 * of successful partial resynchronization, the function will reuse 1109 * 'fd' as file descriptor of the server.master client structure. 1110 * 1111 * The function returns: 1112 * 1113 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 1114 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1115 * In this case the master run_id and global replication 1116 * offset is saved. 1117 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1118 * the caller should fall back to SYNC. 1119 */ 1120 1121 #define PSYNC_CONTINUE 0 1122 #define PSYNC_FULLRESYNC 1 1123 #define PSYNC_NOT_SUPPORTED 2 1124 int slaveTryPartialResynchronization(int fd) { 1125 char *psync_runid; 1126 char psync_offset[32]; 1127 sds reply; 1128 1129 /* Initially set repl_master_initial_offset to -1 to mark the current 1130 * master run_id and offset as not valid. Later if we'll be able to do 1131 * a FULL resync using the PSYNC command we'll set the offset at the 1132 * right value, so that this information will be propagated to the 1133 * client structure representing the master into server.master. */ 1134 server.repl_master_initial_offset = -1; 1135 1136 if (server.cached_master) { 1137 psync_runid = server.cached_master->replrunid; 1138 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1139 redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 1140 } else { 1141 redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); 1142 psync_runid = "?"; 1143 memcpy(psync_offset,"-1",3); 1144 } 1145 1146 /* Issue the PSYNC command */ 1147 reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); 1148 1149 if (!strncmp(reply,"+FULLRESYNC",11)) { 1150 char *runid = NULL, *offset = NULL; 1151 1152 /* FULL RESYNC, parse the reply in order to extract the run id 1153 * and the replication offset. */ 1154 runid = strchr(reply,' '); 1155 if (runid) { 1156 runid++; 1157 offset = strchr(runid,' '); 1158 if (offset) offset++; 1159 } 1160 if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { 1161 redisLog(REDIS_WARNING, 1162 "Master replied with wrong +FULLRESYNC syntax."); 1163 /* This is an unexpected condition, actually the +FULLRESYNC 1164 * reply means that the master supports PSYNC, but the reply 1165 * format seems wrong. To stay safe we blank the master 1166 * runid to make sure next PSYNCs will fail. */ 1167 memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); 1168 } else { 1169 memcpy(server.repl_master_runid, runid, offset-runid-1); 1170 server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; 1171 server.repl_master_initial_offset = strtoll(offset,NULL,10); 1172 redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", 1173 server.repl_master_runid, 1174 server.repl_master_initial_offset); 1175 } 1176 /* We are going to full resync, discard the cached master structure. */ 1177 replicationDiscardCachedMaster(); 1178 sdsfree(reply); 1179 return PSYNC_FULLRESYNC; 1180 } 1181 1182 if (!strncmp(reply,"+CONTINUE",9)) { 1183 /* Partial resync was accepted, set the replication state accordingly */ 1184 redisLog(REDIS_NOTICE, 1185 "Successful partial resynchronization with master."); 1186 sdsfree(reply); 1187 replicationResurrectCachedMaster(fd); 1188 return PSYNC_CONTINUE; 1189 } 1190 1191 /* If we reach this point we receied either an error since the master does 1192 * not understand PSYNC, or an unexpected reply from the master. 1193 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1194 1195 if (strncmp(reply,"-ERR",4)) { 1196 /* If it's not an error, log the unexpected event. */ 1197 redisLog(REDIS_WARNING, 1198 "Unexpected reply to PSYNC from master: %s", reply); 1199 } else { 1200 redisLog(REDIS_NOTICE, 1201 "Master does not support PSYNC or is in " 1202 "error state (reply: %s)", reply); 1203 } 1204 sdsfree(reply); 1205 replicationDiscardCachedMaster(); 1206 return PSYNC_NOT_SUPPORTED; 1207 } 1208 1209 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1210 char tmpfile[256], *err; 1211 int dfd, maxtries = 5; 1212 int sockerr = 0, psync_result; 1213 socklen_t errlen = sizeof(sockerr); 1214 REDIS_NOTUSED(el); 1215 REDIS_NOTUSED(privdata); 1216 REDIS_NOTUSED(mask); 1217 1218 /* If this event fired after the user turned the instance into a master 1219 * with SLAVEOF NO ONE we must just return ASAP. */ 1220 if (server.repl_state == REDIS_REPL_NONE) { 1221 close(fd); 1222 return; 1223 } 1224 1225 /* Check for errors in the socket. */ 1226 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1227 sockerr = errno; 1228 if (sockerr) { 1229 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1230 redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", 1231 strerror(sockerr)); 1232 goto error; 1233 } 1234 1235 /* If we were connecting, it's time to send a non blocking PING, we want to 1236 * make sure the master is able to reply before going into the actual 1237 * replication process where we have long timeouts in the order of 1238 * seconds (in the meantime the slave would block). */ 1239 if (server.repl_state == REDIS_REPL_CONNECTING) { 1240 redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); 1241 /* Delete the writable event so that the readable event remains 1242 * registered and we can wait for the PONG reply. */ 1243 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1244 server.repl_state = REDIS_REPL_RECEIVE_PONG; 1245 /* Send the PING, don't check for errors at all, we have the timeout 1246 * that will take care about this. */ 1247 syncWrite(fd,"PING\r\n",6,100); 1248 return; 1249 } 1250 1251 /* Receive the PONG command. */ 1252 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { 1253 char buf[1024]; 1254 1255 /* Delete the readable event, we no longer need it now that there is 1256 * the PING reply to read. */ 1257 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1258 1259 /* Read the reply with explicit timeout. */ 1260 buf[0] = '\0'; 1261 if (syncReadLine(fd,buf,sizeof(buf), 1262 server.repl_syncio_timeout*1000) == -1) 1263 { 1264 redisLog(REDIS_WARNING, 1265 "I/O error reading PING reply from master: %s", 1266 strerror(errno)); 1267 goto error; 1268 } 1269 1270 /* We accept only two replies as valid, a positive +PONG reply 1271 * (we just check for "+") or an authentication error. 1272 * Note that older versions of Redis replied with "operation not 1273 * permitted" instead of using a proper error code, so we test 1274 * both. */ 1275 if (buf[0] != '+' && 1276 strncmp(buf,"-NOAUTH",7) != 0 && 1277 strncmp(buf,"-ERR operation not permitted",28) != 0) 1278 { 1279 redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); 1280 goto error; 1281 } else { 1282 redisLog(REDIS_NOTICE, 1283 "Master replied to PING, replication can continue..."); 1284 } 1285 } 1286 1287 /* AUTH with the master if required. */ 1288 if(server.masterauth) { 1289 err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); 1290 if (err[0] == '-') { 1291 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); 1292 sdsfree(err); 1293 goto error; 1294 } 1295 sdsfree(err); 1296 } 1297 1298 /* Set the slave port, so that Master's INFO command can list the 1299 * slave listening port correctly. */ 1300 { 1301 sds port = sdsfromlonglong(server.port); 1302 err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, 1303 NULL); 1304 sdsfree(port); 1305 /* Ignore the error if any, not all the Redis versions support 1306 * REPLCONF listening-port. */ 1307 if (err[0] == '-') { 1308 redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); 1309 } 1310 sdsfree(err); 1311 } 1312 1313 /* Try a partial resynchonization. If we don't have a cached master 1314 * slaveTryPartialResynchronization() will at least try to use PSYNC 1315 * to start a full resynchronization so that we get the master run id 1316 * and the global offset, to try a partial resync at the next 1317 * reconnection attempt. */ 1318 psync_result = slaveTryPartialResynchronization(fd); 1319 if (psync_result == PSYNC_CONTINUE) { 1320 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1321 return; 1322 } 1323 1324 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1325 * and the server.repl_master_runid and repl_master_initial_offset are 1326 * already populated. */ 1327 if (psync_result == PSYNC_NOT_SUPPORTED) { 1328 redisLog(REDIS_NOTICE,"Retrying with SYNC..."); 1329 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1330 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", 1331 strerror(errno)); 1332 goto error; 1333 } 1334 } 1335 1336 /* Prepare a suitable temp file for bulk transfer */ 1337 while(maxtries--) { 1338 snprintf(tmpfile,256, 1339 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1340 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1341 if (dfd != -1) break; 1342 sleep(1); 1343 } 1344 if (dfd == -1) { 1345 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1346 goto error; 1347 } 1348 1349 /* Setup the non blocking download of the bulk file. */ 1350 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1351 == AE_ERR) 1352 { 1353 redisLog(REDIS_WARNING, 1354 "Can't create readable event for SYNC: %s (fd=%d)", 1355 strerror(errno),fd); 1356 goto error; 1357 } 1358 1359 server.repl_state = REDIS_REPL_TRANSFER; 1360 server.repl_transfer_size = -1; 1361 server.repl_transfer_read = 0; 1362 server.repl_transfer_last_fsync_off = 0; 1363 server.repl_transfer_fd = dfd; 1364 server.repl_transfer_lastio = server.unixtime; 1365 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1366 return; 1367 1368 error: 1369 close(fd); 1370 server.repl_transfer_s = -1; 1371 server.repl_state = REDIS_REPL_CONNECT; 1372 return; 1373 } 1374 1375 int connectWithMaster(void) { 1376 int fd; 1377 1378 fd = anetTcpNonBlockBindConnect(NULL, 1379 server.masterhost,server.masterport,REDIS_BIND_ADDR); 1380 if (fd == -1) { 1381 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", 1382 strerror(errno)); 1383 return REDIS_ERR; 1384 } 1385 1386 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1387 AE_ERR) 1388 { 1389 close(fd); 1390 redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); 1391 return REDIS_ERR; 1392 } 1393 1394 server.repl_transfer_lastio = server.unixtime; 1395 server.repl_transfer_s = fd; 1396 server.repl_state = REDIS_REPL_CONNECTING; 1397 return REDIS_OK; 1398 } 1399 1400 /* This function can be called when a non blocking connection is currently 1401 * in progress to undo it. */ 1402 void undoConnectWithMaster(void) { 1403 int fd = server.repl_transfer_s; 1404 1405 redisAssert(server.repl_state == REDIS_REPL_CONNECTING || 1406 server.repl_state == REDIS_REPL_RECEIVE_PONG); 1407 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1408 close(fd); 1409 server.repl_transfer_s = -1; 1410 server.repl_state = REDIS_REPL_CONNECT; 1411 } 1412 1413 /* This function aborts a non blocking replication attempt if there is one 1414 * in progress, by canceling the non-blocking connect attempt or 1415 * the initial bulk transfer. 1416 * 1417 * If there was a replication handshake in progress 1 is returned and 1418 * the replication state (server.repl_state) set to REDIS_REPL_CONNECT. 1419 * 1420 * Otherwise zero is returned and no operation is perforemd at all. */ 1421 int cancelReplicationHandshake(void) { 1422 if (server.repl_state == REDIS_REPL_TRANSFER) { 1423 replicationAbortSyncTransfer(); 1424 } else if (server.repl_state == REDIS_REPL_CONNECTING || 1425 server.repl_state == REDIS_REPL_RECEIVE_PONG) 1426 { 1427 undoConnectWithMaster(); 1428 } else { 1429 return 0; 1430 } 1431 return 1; 1432 } 1433 1434 /* Set replication to the specified master address and port. */ 1435 void replicationSetMaster(char *ip, int port) { 1436 sdsfree(server.masterhost); 1437 server.masterhost = sdsnew(ip); 1438 server.masterport = port; 1439 if (server.master) freeClient(server.master); 1440 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1441 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1442 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1443 cancelReplicationHandshake(); 1444 server.repl_state = REDIS_REPL_CONNECT; 1445 server.master_repl_offset = 0; 1446 server.repl_down_since = 0; 1447 } 1448 1449 /* Cancel replication, setting the instance as a master itself. */ 1450 void replicationUnsetMaster(void) { 1451 if (server.masterhost == NULL) return; /* Nothing to do. */ 1452 sdsfree(server.masterhost); 1453 server.masterhost = NULL; 1454 if (server.master) { 1455 if (listLength(server.slaves) == 0) { 1456 /* If this instance is turned into a master and there are no 1457 * slaves, it inherits the replication offset from the master. 1458 * Under certain conditions this makes replicas comparable by 1459 * replication offset to understand what is the most updated. */ 1460 server.master_repl_offset = server.master->reploff; 1461 freeReplicationBacklog(); 1462 } 1463 freeClient(server.master); 1464 } 1465 replicationDiscardCachedMaster(); 1466 cancelReplicationHandshake(); 1467 server.repl_state = REDIS_REPL_NONE; 1468 } 1469 1470 void slaveofCommand(redisClient *c) { 1471 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1472 * configured using the current address of the master node. */ 1473 if (server.cluster_enabled) { 1474 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1475 return; 1476 } 1477 1478 /* The special host/port combination "NO" "ONE" turns the instance 1479 * into a master. Otherwise the new master address is set. */ 1480 if (!strcasecmp(c->argv[1]->ptr,"no") && 1481 !strcasecmp(c->argv[2]->ptr,"one")) { 1482 if (server.masterhost) { 1483 replicationUnsetMaster(); 1484 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); 1485 } 1486 } else { 1487 long port; 1488 1489 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) 1490 return; 1491 1492 /* Check if we are already attached to the specified slave */ 1493 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1494 && server.masterport == port) { 1495 redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1496 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1497 return; 1498 } 1499 /* There was no previous master or the user specified a different one, 1500 * we can continue. */ 1501 replicationSetMaster(c->argv[1]->ptr, port); 1502 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", 1503 server.masterhost, server.masterport); 1504 } 1505 addReply(c,shared.ok); 1506 } 1507 1508 /* ROLE command: provide information about the role of the instance 1509 * (master or slave) and additional information related to replication 1510 * in an easy to process format. */ 1511 void roleCommand(redisClient *c) { 1512 if (server.masterhost == NULL) { 1513 listIter li; 1514 listNode *ln; 1515 void *mbcount; 1516 int slaves = 0; 1517 1518 addReplyMultiBulkLen(c,3); 1519 addReplyBulkCBuffer(c,"master",6); 1520 addReplyLongLong(c,server.master_repl_offset); 1521 mbcount = addDeferredMultiBulkLength(c); 1522 listRewind(server.slaves,&li); 1523 while((ln = listNext(&li))) { 1524 redisClient *slave = ln->value; 1525 char ip[REDIS_IP_STR_LEN]; 1526 1527 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1528 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1529 addReplyMultiBulkLen(c,3); 1530 addReplyBulkCString(c,ip); 1531 addReplyBulkLongLong(c,slave->slave_listening_port); 1532 addReplyBulkLongLong(c,slave->repl_ack_off); 1533 slaves++; 1534 } 1535 setDeferredMultiBulkLength(c,mbcount,slaves); 1536 } else { 1537 char *slavestate = NULL; 1538 1539 addReplyMultiBulkLen(c,5); 1540 addReplyBulkCBuffer(c,"slave",5); 1541 addReplyBulkCString(c,server.masterhost); 1542 addReplyLongLong(c,server.masterport); 1543 switch(server.repl_state) { 1544 case REDIS_REPL_NONE: slavestate = "none"; break; 1545 case REDIS_REPL_CONNECT: slavestate = "connect"; break; 1546 case REDIS_REPL_CONNECTING: slavestate = "connecting"; break; 1547 case REDIS_REPL_RECEIVE_PONG: /* see next */ 1548 case REDIS_REPL_TRANSFER: slavestate = "sync"; break; 1549 case REDIS_REPL_CONNECTED: slavestate = "connected"; break; 1550 default: slavestate = "unknown"; break; 1551 } 1552 addReplyBulkCString(c,slavestate); 1553 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1554 } 1555 } 1556 1557 /* Send a REPLCONF ACK command to the master to inform it about the current 1558 * processed offset. If we are not connected with a master, the command has 1559 * no effects. */ 1560 void replicationSendAck(void) { 1561 redisClient *c = server.master; 1562 1563 if (c != NULL) { 1564 c->flags |= REDIS_MASTER_FORCE_REPLY; 1565 addReplyMultiBulkLen(c,3); 1566 addReplyBulkCString(c,"REPLCONF"); 1567 addReplyBulkCString(c,"ACK"); 1568 addReplyBulkLongLong(c,c->reploff); 1569 c->flags &= ~REDIS_MASTER_FORCE_REPLY; 1570 } 1571 } 1572 1573 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1574 1575 /* In order to implement partial synchronization we need to be able to cache 1576 * our master's client structure after a transient disconnection. 1577 * It is cached into server.cached_master and flushed away using the following 1578 * functions. */ 1579 1580 /* This function is called by freeClient() in order to cache the master 1581 * client structure instead of destryoing it. freeClient() will return 1582 * ASAP after this function returns, so every action needed to avoid problems 1583 * with a client that is really "suspended" has to be done by this function. 1584 * 1585 * The other functions that will deal with the cached master are: 1586 * 1587 * replicationDiscardCachedMaster() that will make sure to kill the client 1588 * as for some reason we don't want to use it in the future. 1589 * 1590 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1591 * handshake in order to reactivate the cached master. 1592 */ 1593 void replicationCacheMaster(redisClient *c) { 1594 listNode *ln; 1595 1596 redisAssert(server.master != NULL && server.cached_master == NULL); 1597 redisLog(REDIS_NOTICE,"Caching the disconnected master state."); 1598 1599 /* Remove from the list of clients, we don't want this client to be 1600 * listed by CLIENT LIST or processed in any way by batch operations. */ 1601 ln = listSearchKey(server.clients,c); 1602 redisAssert(ln != NULL); 1603 listDelNode(server.clients,ln); 1604 1605 /* Save the master. Server.master will be set to null later by 1606 * replicationHandleMasterDisconnection(). */ 1607 server.cached_master = server.master; 1608 1609 /* Remove the event handlers and close the socket. We'll later reuse 1610 * the socket of the new connection with the master during PSYNC. */ 1611 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 1612 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1613 close(c->fd); 1614 1615 /* Set fd to -1 so that we can safely call freeClient(c) later. */ 1616 c->fd = -1; 1617 1618 /* Invalidate the Peer ID cache. */ 1619 if (c->peerid) { 1620 sdsfree(c->peerid); 1621 c->peerid = NULL; 1622 } 1623 1624 /* Caching the master happens instead of the actual freeClient() call, 1625 * so make sure to adjust the replication state. This function will 1626 * also set server.master to NULL. */ 1627 replicationHandleMasterDisconnection(); 1628 } 1629 1630 /* Free a cached master, called when there are no longer the conditions for 1631 * a partial resync on reconnection. */ 1632 void replicationDiscardCachedMaster(void) { 1633 if (server.cached_master == NULL) return; 1634 1635 redisLog(REDIS_NOTICE,"Discarding previously cached master state."); 1636 server.cached_master->flags &= ~REDIS_MASTER; 1637 freeClient(server.cached_master); 1638 server.cached_master = NULL; 1639 } 1640 1641 /* Turn the cached master into the current master, using the file descriptor 1642 * passed as argument as the socket for the new master. 1643 * 1644 * This function is called when successfully setup a partial resynchronization 1645 * so the stream of data that we'll receive will start from were this 1646 * master left. */ 1647 void replicationResurrectCachedMaster(int newfd) { 1648 server.master = server.cached_master; 1649 server.cached_master = NULL; 1650 server.master->fd = newfd; 1651 server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); 1652 server.master->authenticated = 1; 1653 server.master->lastinteraction = server.unixtime; 1654 server.repl_state = REDIS_REPL_CONNECTED; 1655 1656 /* Re-add to the list of clients. */ 1657 listAddNodeTail(server.clients,server.master); 1658 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1659 readQueryFromClient, server.master)) { 1660 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1661 freeClientAsync(server.master); /* Close ASAP. */ 1662 } 1663 1664 /* We may also need to install the write handler as well if there is 1665 * pending data in the write buffers. */ 1666 if (server.master->bufpos || listLength(server.master->reply)) { 1667 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1668 sendReplyToClient, server.master)) { 1669 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1670 freeClientAsync(server.master); /* Close ASAP. */ 1671 } 1672 } 1673 } 1674 1675 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1676 1677 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1678 * If the option is active, the server will prevent writes if there are not 1679 * enough connected slaves with the specified lag (or less). */ 1680 void refreshGoodSlavesCount(void) { 1681 listIter li; 1682 listNode *ln; 1683 int good = 0; 1684 1685 if (!server.repl_min_slaves_to_write || 1686 !server.repl_min_slaves_max_lag) return; 1687 1688 listRewind(server.slaves,&li); 1689 while((ln = listNext(&li))) { 1690 redisClient *slave = ln->value; 1691 time_t lag = server.unixtime - slave->repl_ack_time; 1692 1693 if (slave->replstate == REDIS_REPL_ONLINE && 1694 lag <= server.repl_min_slaves_max_lag) good++; 1695 } 1696 server.repl_good_slaves_count = good; 1697 } 1698 1699 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1700 * The goal of this code is to keep track of scripts already sent to every 1701 * connected slave, in order to be able to replicate EVALSHA as it is without 1702 * translating it to EVAL every time it is possible. 1703 * 1704 * We use a capped collection implemented by a hash table for fast lookup 1705 * of scripts we can send as EVALSHA, plus a linked list that is used for 1706 * eviction of the oldest entry when the max number of items is reached. 1707 * 1708 * We don't care about taking a different cache for every different slave 1709 * since to fill the cache again is not very costly, the goal of this code 1710 * is to avoid that the same big script is trasmitted a big number of times 1711 * per second wasting bandwidth and processor speed, but it is not a problem 1712 * if we need to rebuild the cache from scratch from time to time, every used 1713 * script will need to be transmitted a single time to reappear in the cache. 1714 * 1715 * This is how the system works: 1716 * 1717 * 1) Every time a new slave connects, we flush the whole script cache. 1718 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1719 * trying to convert EVAL into EVALSHA specifically for slaves. 1720 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1721 * corresponding SHA1 of the script into the cache as we are sure every 1722 * slave knows about the script starting from now. 1723 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1724 * and at the same time flush the script cache. 1725 * 5) When the last slave disconnects, flush the cache. 1726 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1727 * in the master sometimes. 1728 */ 1729 1730 /* Initialize the script cache, only called at startup. */ 1731 void replicationScriptCacheInit(void) { 1732 server.repl_scriptcache_size = 10000; 1733 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1734 server.repl_scriptcache_fifo = listCreate(); 1735 } 1736 1737 /* Empty the script cache. Should be called every time we are no longer sure 1738 * that every slave knows about all the scripts in our set, or when the 1739 * current AOF "context" is no longer aware of the script. In general we 1740 * should flush the cache: 1741 * 1742 * 1) Every time a new slave reconnects to this master and performs a 1743 * full SYNC (PSYNC does not require flushing). 1744 * 2) Every time an AOF rewrite is performed. 1745 * 3) Every time we are left without slaves at all, and AOF is off, in order 1746 * to reclaim otherwise unused memory. 1747 */ 1748 void replicationScriptCacheFlush(void) { 1749 dictEmpty(server.repl_scriptcache_dict,NULL); 1750 listRelease(server.repl_scriptcache_fifo); 1751 server.repl_scriptcache_fifo = listCreate(); 1752 } 1753 1754 /* Add an entry into the script cache, if we reach max number of entries the 1755 * oldest is removed from the list. */ 1756 void replicationScriptCacheAdd(sds sha1) { 1757 int retval; 1758 sds key = sdsdup(sha1); 1759 1760 /* Evict oldest. */ 1761 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 1762 { 1763 listNode *ln = listLast(server.repl_scriptcache_fifo); 1764 sds oldest = listNodeValue(ln); 1765 1766 retval = dictDelete(server.repl_scriptcache_dict,oldest); 1767 redisAssert(retval == DICT_OK); 1768 listDelNode(server.repl_scriptcache_fifo,ln); 1769 } 1770 1771 /* Add current. */ 1772 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 1773 listAddNodeHead(server.repl_scriptcache_fifo,key); 1774 redisAssert(retval == DICT_OK); 1775 } 1776 1777 /* Returns non-zero if the specified entry exists inside the cache, that is, 1778 * if all the slaves are aware of this script SHA1. */ 1779 int replicationScriptCacheExists(sds sha1) { 1780 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 1781 } 1782 1783 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 1784 * Redis synchronous replication design can be summarized in points: 1785 * 1786 * - Redis masters have a global replication offset, used by PSYNC. 1787 * - Master increment the offset every time new commands are sent to slaves. 1788 * - Slaves ping back masters with the offset processed so far. 1789 * 1790 * So synchronous replication adds a new WAIT command in the form: 1791 * 1792 * WAIT <num_replicas> <milliseconds_timeout> 1793 * 1794 * That returns the number of replicas that processed the query when 1795 * we finally have at least num_replicas, or when the timeout was 1796 * reached. 1797 * 1798 * The command is implemented in this way: 1799 * 1800 * - Every time a client processes a command, we remember the replication 1801 * offset after sending that command to the slaves. 1802 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 1803 * The client is blocked at the same time (see blocked.c). 1804 * - Once we receive enough ACKs for a given offset or when the timeout 1805 * is reached, the WAIT command is unblocked and the reply sent to the 1806 * client. 1807 */ 1808 1809 /* This just set a flag so that we broadcast a REPLCONF GETACK command 1810 * to all the slaves in the beforeSleep() function. Note that this way 1811 * we "group" all the clients that want to wait for synchronouns replication 1812 * in a given event loop iteration, and send a single GETACK for them all. */ 1813 void replicationRequestAckFromSlaves(void) { 1814 server.get_ack_from_slaves = 1; 1815 } 1816 1817 /* Return the number of slaves that already acknowledged the specified 1818 * replication offset. */ 1819 int replicationCountAcksByOffset(long long offset) { 1820 listIter li; 1821 listNode *ln; 1822 int count = 0; 1823 1824 listRewind(server.slaves,&li); 1825 while((ln = listNext(&li))) { 1826 redisClient *slave = ln->value; 1827 1828 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1829 if (slave->repl_ack_off >= offset) count++; 1830 } 1831 return count; 1832 } 1833 1834 /* WAIT for N replicas to acknowledge the processing of our latest 1835 * write command (and all the previous commands). */ 1836 void waitCommand(redisClient *c) { 1837 mstime_t timeout; 1838 long numreplicas, ackreplicas; 1839 long long offset = c->woff; 1840 1841 /* Argument parsing. */ 1842 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK) 1843 return; 1844 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 1845 != REDIS_OK) return; 1846 1847 /* First try without blocking at all. */ 1848 ackreplicas = replicationCountAcksByOffset(c->woff); 1849 if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) { 1850 addReplyLongLong(c,ackreplicas); 1851 return; 1852 } 1853 1854 /* Otherwise block the client and put it into our list of clients 1855 * waiting for ack from slaves. */ 1856 c->bpop.timeout = timeout; 1857 c->bpop.reploffset = offset; 1858 c->bpop.numreplicas = numreplicas; 1859 listAddNodeTail(server.clients_waiting_acks,c); 1860 blockClient(c,REDIS_BLOCKED_WAIT); 1861 1862 /* Make sure that the server will send an ACK request to all the slaves 1863 * before returning to the event loop. */ 1864 replicationRequestAckFromSlaves(); 1865 } 1866 1867 /* This is called by unblockClient() to perform the blocking op type 1868 * specific cleanup. We just remove the client from the list of clients 1869 * waiting for replica acks. Never call it directly, call unblockClient() 1870 * instead. */ 1871 void unblockClientWaitingReplicas(redisClient *c) { 1872 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 1873 redisAssert(ln != NULL); 1874 listDelNode(server.clients_waiting_acks,ln); 1875 } 1876 1877 /* Check if there are clients blocked in WAIT that can be unblocked since 1878 * we received enough ACKs from slaves. */ 1879 void processClientsWaitingReplicas(void) { 1880 long long last_offset = 0; 1881 int last_numreplicas = 0; 1882 1883 listIter li; 1884 listNode *ln; 1885 1886 listRewind(server.clients_waiting_acks,&li); 1887 while((ln = listNext(&li))) { 1888 redisClient *c = ln->value; 1889 1890 /* Every time we find a client that is satisfied for a given 1891 * offset and number of replicas, we remember it so the next client 1892 * may be unblocked without calling replicationCountAcksByOffset() 1893 * if the requested offset / replicas were equal or less. */ 1894 if (last_offset && last_offset > c->bpop.reploffset && 1895 last_numreplicas > c->bpop.numreplicas) 1896 { 1897 unblockClient(c); 1898 addReplyLongLong(c,last_numreplicas); 1899 } else { 1900 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 1901 1902 if (numreplicas >= c->bpop.numreplicas) { 1903 last_offset = c->bpop.reploffset; 1904 last_numreplicas = numreplicas; 1905 unblockClient(c); 1906 addReplyLongLong(c,numreplicas); 1907 } 1908 } 1909 } 1910 } 1911 1912 /* Return the slave replication offset for this instance, that is 1913 * the offset for which we already processed the master replication stream. */ 1914 long long replicationGetSlaveOffset(void) { 1915 long long offset = 0; 1916 1917 if (server.masterhost != NULL) { 1918 if (server.master) { 1919 offset = server.master->reploff; 1920 } else if (server.cached_master) { 1921 offset = server.cached_master->reploff; 1922 } 1923 } 1924 /* offset may be -1 when the master does not support it at all, however 1925 * this function is designed to return an offset that can express the 1926 * amount of data processed by the master, so we return a positive 1927 * integer. */ 1928 if (offset < 0) offset = 0; 1929 return offset; 1930 } 1931 1932 /* --------------------------- REPLICATION CRON ---------------------------- */ 1933 1934 /* Replication cron function, called 1 time per second. */ 1935 void replicationCron(void) { 1936 /* Non blocking connection timeout? */ 1937 if (server.masterhost && 1938 (server.repl_state == REDIS_REPL_CONNECTING || 1939 server.repl_state == REDIS_REPL_RECEIVE_PONG) && 1940 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1941 { 1942 redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); 1943 undoConnectWithMaster(); 1944 } 1945 1946 /* Bulk transfer I/O timeout? */ 1947 if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER && 1948 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1949 { 1950 redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 1951 replicationAbortSyncTransfer(); 1952 } 1953 1954 /* Timed out master when we are an already connected slave? */ 1955 if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED && 1956 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 1957 { 1958 redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received..."); 1959 freeClient(server.master); 1960 } 1961 1962 /* Check if we should connect to a MASTER */ 1963 if (server.repl_state == REDIS_REPL_CONNECT) { 1964 redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", 1965 server.masterhost, server.masterport); 1966 if (connectWithMaster() == REDIS_OK) { 1967 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); 1968 } 1969 } 1970 1971 /* Send ACK to master from time to time. 1972 * Note that we do not send periodic acks to masters that don't 1973 * support PSYNC and replication offsets. */ 1974 if (server.masterhost && server.master && 1975 !(server.master->flags & REDIS_PRE_PSYNC)) 1976 replicationSendAck(); 1977 1978 /* If we have attached slaves, PING them from time to time. 1979 * So slaves can implement an explicit timeout to masters, and will 1980 * be able to detect a link disconnection even if the TCP connection 1981 * will not actually go down. */ 1982 if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { 1983 listIter li; 1984 listNode *ln; 1985 robj *ping_argv[1]; 1986 1987 /* First, send PING */ 1988 ping_argv[0] = createStringObject("PING",4); 1989 replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); 1990 decrRefCount(ping_argv[0]); 1991 1992 /* Second, send a newline to all the slaves in pre-synchronization 1993 * stage, that is, slaves waiting for the master to create the RDB file. 1994 * The newline will be ignored by the slave but will refresh the 1995 * last-io timer preventing a timeout. */ 1996 listRewind(server.slaves,&li); 1997 while((ln = listNext(&li))) { 1998 redisClient *slave = ln->value; 1999 2000 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || 2001 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && 2002 server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) 2003 { 2004 if (write(slave->fd, "\n", 1) == -1) { 2005 /* Don't worry, it's just a ping. */ 2006 } 2007 } 2008 } 2009 } 2010 2011 /* Disconnect timedout slaves. */ 2012 if (listLength(server.slaves)) { 2013 listIter li; 2014 listNode *ln; 2015 2016 listRewind(server.slaves,&li); 2017 while((ln = listNext(&li))) { 2018 redisClient *slave = ln->value; 2019 2020 if (slave->replstate != REDIS_REPL_ONLINE) continue; 2021 if (slave->flags & REDIS_PRE_PSYNC) continue; 2022 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2023 { 2024 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", 2025 replicationGetSlaveName(slave)); 2026 freeClient(slave); 2027 } 2028 } 2029 } 2030 2031 /* If we have no attached slaves and there is a replication backlog 2032 * using memory, free it after some (configured) time. */ 2033 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2034 server.repl_backlog) 2035 { 2036 time_t idle = server.unixtime - server.repl_no_slaves_since; 2037 2038 if (idle > server.repl_backlog_time_limit) { 2039 freeReplicationBacklog(); 2040 redisLog(REDIS_NOTICE, 2041 "Replication backlog freed after %d seconds " 2042 "without connected slaves.", 2043 (int) server.repl_backlog_time_limit); 2044 } 2045 } 2046 2047 /* If AOF is disabled and we no longer have attached slaves, we can 2048 * free our Replication Script Cache as there is no need to propagate 2049 * EVALSHA at all. */ 2050 if (listLength(server.slaves) == 0 && 2051 server.aof_state == REDIS_AOF_OFF && 2052 listLength(server.repl_scriptcache_fifo) != 0) 2053 { 2054 replicationScriptCacheFlush(); 2055 } 2056 2057 /* If we are using diskless replication and there are slaves waiting 2058 * in WAIT_BGSAVE_START state, check if enough seconds elapsed and 2059 * start a BGSAVE. 2060 * 2061 * This code is also useful to trigger a BGSAVE if the diskless 2062 * replication was turned off with CONFIG SET, while there were already 2063 * slaves in WAIT_BGSAVE_START state. */ 2064 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2065 time_t idle, max_idle = 0; 2066 int slaves_waiting = 0; 2067 listNode *ln; 2068 listIter li; 2069 2070 listRewind(server.slaves,&li); 2071 while((ln = listNext(&li))) { 2072 redisClient *slave = ln->value; 2073 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 2074 idle = server.unixtime - slave->lastinteraction; 2075 if (idle > max_idle) max_idle = idle; 2076 slaves_waiting++; 2077 } 2078 } 2079 2080 if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { 2081 /* Start a BGSAVE. Usually with socket target, or with disk target 2082 * if there was a recent socket -> disk config change. */ 2083 if (startBgsaveForReplication() == REDIS_OK) { 2084 /* It started! We need to change the state of slaves 2085 * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case 2086 * the current target is disk. Otherwise it was already done 2087 * by rdbSaveToSlavesSockets() which is called by 2088 * startBgsaveForReplication(). */ 2089 listRewind(server.slaves,&li); 2090 while((ln = listNext(&li))) { 2091 redisClient *slave = ln->value; 2092 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 2093 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 2094 } 2095 } 2096 } 2097 } 2098 2099 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2100 refreshGoodSlavesCount(); 2101 } 2102