1 /* Asynchronous replication implementation. 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 32 #include "redis.h" 33 34 #include <sys/time.h> 35 #include <unistd.h> 36 #include <fcntl.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 40 void replicationDiscardCachedMaster(void); 41 void replicationResurrectCachedMaster(int newfd); 42 void replicationSendAck(void); 43 void putSlaveOnline(redisClient *slave); 44 45 /* --------------------------- Utility functions ---------------------------- */ 46 47 /* Return the pointer to a string representing the slave ip:listening_port 48 * pair. Mostly useful for logging, since we want to log a slave using its 49 * IP address and it's listening port which is more clear for the user, for 50 * example: "Closing connection with slave 10.1.2.3:6380". */ 51 char *replicationGetSlaveName(redisClient *c) { 52 static char buf[REDIS_PEER_ID_LEN]; 53 char ip[REDIS_IP_STR_LEN]; 54 55 ip[0] = '\0'; 56 buf[0] = '\0'; 57 if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { 58 if (c->slave_listening_port) 59 snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port); 60 else 61 snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip); 62 } else { 63 snprintf(buf,sizeof(buf),"client id #%llu", 64 (unsigned long long) c->id); 65 } 66 return buf; 67 } 68 69 /* ---------------------------------- MASTER -------------------------------- */ 70 71 void createReplicationBacklog(void) { 72 redisAssert(server.repl_backlog == NULL); 73 server.repl_backlog = zmalloc(server.repl_backlog_size); 74 server.repl_backlog_histlen = 0; 75 server.repl_backlog_idx = 0; 76 /* When a new backlog buffer is created, we increment the replication 77 * offset by one to make sure we'll not be able to PSYNC with any 78 * previous slave. This is needed because we avoid incrementing the 79 * master_repl_offset if no backlog exists nor slaves are attached. */ 80 server.master_repl_offset++; 81 82 /* We don't have any data inside our buffer, but virtually the first 83 * byte we have is the next byte that will be generated for the 84 * replication stream. */ 85 server.repl_backlog_off = server.master_repl_offset+1; 86 } 87 88 /* This function is called when the user modifies the replication backlog 89 * size at runtime. It is up to the function to both update the 90 * server.repl_backlog_size and to resize the buffer and setup it so that 91 * it contains the same data as the previous one (possibly less data, but 92 * the most recent bytes, or the same data and more free space in case the 93 * buffer is enlarged). */ 94 void resizeReplicationBacklog(long long newsize) { 95 if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE) 96 newsize = REDIS_REPL_BACKLOG_MIN_SIZE; 97 if (server.repl_backlog_size == newsize) return; 98 99 server.repl_backlog_size = newsize; 100 if (server.repl_backlog != NULL) { 101 /* What we actually do is to flush the old buffer and realloc a new 102 * empty one. It will refill with new data incrementally. 103 * The reason is that copying a few gigabytes adds latency and even 104 * worse often we need to alloc additional space before freeing the 105 * old buffer. */ 106 zfree(server.repl_backlog); 107 server.repl_backlog = zmalloc(server.repl_backlog_size); 108 server.repl_backlog_histlen = 0; 109 server.repl_backlog_idx = 0; 110 /* Next byte we have is... the next since the buffer is empty. */ 111 server.repl_backlog_off = server.master_repl_offset+1; 112 } 113 } 114 115 void freeReplicationBacklog(void) { 116 redisAssert(listLength(server.slaves) == 0); 117 zfree(server.repl_backlog); 118 server.repl_backlog = NULL; 119 } 120 121 /* Add data to the replication backlog. 122 * This function also increments the global replication offset stored at 123 * server.master_repl_offset, because there is no case where we want to feed 124 * the backlog without incrementing the buffer. */ 125 void feedReplicationBacklog(void *ptr, size_t len) { 126 unsigned char *p = ptr; 127 128 server.master_repl_offset += len; 129 130 /* This is a circular buffer, so write as much data we can at every 131 * iteration and rewind the "idx" index if we reach the limit. */ 132 while(len) { 133 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; 134 if (thislen > len) thislen = len; 135 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); 136 server.repl_backlog_idx += thislen; 137 if (server.repl_backlog_idx == server.repl_backlog_size) 138 server.repl_backlog_idx = 0; 139 len -= thislen; 140 p += thislen; 141 server.repl_backlog_histlen += thislen; 142 } 143 if (server.repl_backlog_histlen > server.repl_backlog_size) 144 server.repl_backlog_histlen = server.repl_backlog_size; 145 /* Set the offset of the first byte we have in the backlog. */ 146 server.repl_backlog_off = server.master_repl_offset - 147 server.repl_backlog_histlen + 1; 148 } 149 150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects 151 * as input. */ 152 void feedReplicationBacklogWithObject(robj *o) { 153 char llstr[REDIS_LONGSTR_SIZE]; 154 void *p; 155 size_t len; 156 157 if (o->encoding == REDIS_ENCODING_INT) { 158 len = ll2string(llstr,sizeof(llstr),(long)o->ptr); 159 p = llstr; 160 } else { 161 len = sdslen(o->ptr); 162 p = o->ptr; 163 } 164 feedReplicationBacklog(p,len); 165 } 166 167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 168 listNode *ln; 169 listIter li; 170 int j, len; 171 char llstr[REDIS_LONGSTR_SIZE]; 172 173 /* If there aren't slaves, and there is no backlog buffer to populate, 174 * we can return ASAP. */ 175 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; 176 177 /* We can't have slaves attached and no backlog. */ 178 redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); 179 180 /* Send SELECT command to every slave if needed. */ 181 if (server.slaveseldb != dictid) { 182 robj *selectcmd; 183 184 /* For a few DBs we have pre-computed SELECT command. */ 185 if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { 186 selectcmd = shared.select[dictid]; 187 } else { 188 int dictid_len; 189 190 dictid_len = ll2string(llstr,sizeof(llstr),dictid); 191 selectcmd = createObject(REDIS_STRING, 192 sdscatprintf(sdsempty(), 193 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 194 dictid_len, llstr)); 195 } 196 197 /* Add the SELECT command into the backlog. */ 198 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); 199 200 /* Send it to slaves. */ 201 listRewind(slaves,&li); 202 while((ln = listNext(&li))) { 203 redisClient *slave = ln->value; 204 addReply(slave,selectcmd); 205 } 206 207 if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) 208 decrRefCount(selectcmd); 209 } 210 server.slaveseldb = dictid; 211 212 /* Write the command to the replication backlog if any. */ 213 if (server.repl_backlog) { 214 char aux[REDIS_LONGSTR_SIZE+3]; 215 216 /* Add the multi bulk reply length. */ 217 aux[0] = '*'; 218 len = ll2string(aux+1,sizeof(aux)-1,argc); 219 aux[len+1] = '\r'; 220 aux[len+2] = '\n'; 221 feedReplicationBacklog(aux,len+3); 222 223 for (j = 0; j < argc; j++) { 224 long objlen = stringObjectLen(argv[j]); 225 226 /* We need to feed the buffer with the object as a bulk reply 227 * not just as a plain string, so create the $..CRLF payload len 228 * and add the final CRLF */ 229 aux[0] = '$'; 230 len = ll2string(aux+1,sizeof(aux)-1,objlen); 231 aux[len+1] = '\r'; 232 aux[len+2] = '\n'; 233 feedReplicationBacklog(aux,len+3); 234 feedReplicationBacklogWithObject(argv[j]); 235 feedReplicationBacklog(aux+len+1,2); 236 } 237 } 238 239 /* Write the command to every slave. */ 240 listRewind(server.slaves,&li); 241 while((ln = listNext(&li))) { 242 redisClient *slave = ln->value; 243 244 /* Don't feed slaves that are still waiting for BGSAVE to start */ 245 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; 246 247 /* Feed slaves that are waiting for the initial SYNC (so these commands 248 * are queued in the output buffer until the initial SYNC completes), 249 * or are already in sync with the master. */ 250 251 /* Add the multi bulk length. */ 252 addReplyMultiBulkLen(slave,argc); 253 254 /* Finally any additional argument that was not stored inside the 255 * static buffer if any (from j to argc). */ 256 for (j = 0; j < argc; j++) 257 addReplyBulk(slave,argv[j]); 258 } 259 } 260 261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { 262 listNode *ln; 263 listIter li; 264 int j; 265 sds cmdrepr = sdsnew("+"); 266 robj *cmdobj; 267 struct timeval tv; 268 269 gettimeofday(&tv,NULL); 270 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); 271 if (c->flags & REDIS_LUA_CLIENT) { 272 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); 273 } else if (c->flags & REDIS_UNIX_SOCKET) { 274 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); 275 } else { 276 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); 277 } 278 279 for (j = 0; j < argc; j++) { 280 if (argv[j]->encoding == REDIS_ENCODING_INT) { 281 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); 282 } else { 283 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, 284 sdslen(argv[j]->ptr)); 285 } 286 if (j != argc-1) 287 cmdrepr = sdscatlen(cmdrepr," ",1); 288 } 289 cmdrepr = sdscatlen(cmdrepr,"\r\n",2); 290 cmdobj = createObject(REDIS_STRING,cmdrepr); 291 292 listRewind(monitors,&li); 293 while((ln = listNext(&li))) { 294 redisClient *monitor = ln->value; 295 addReply(monitor,cmdobj); 296 } 297 decrRefCount(cmdobj); 298 } 299 300 /* Feed the slave 'c' with the replication backlog starting from the 301 * specified 'offset' up to the end of the backlog. */ 302 long long addReplyReplicationBacklog(redisClient *c, long long offset) { 303 long long j, skip, len; 304 305 redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset); 306 307 if (server.repl_backlog_histlen == 0) { 308 redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero"); 309 return 0; 310 } 311 312 redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld", 313 server.repl_backlog_size); 314 redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld", 315 server.repl_backlog_off); 316 redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld", 317 server.repl_backlog_histlen); 318 redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld", 319 server.repl_backlog_idx); 320 321 /* Compute the amount of bytes we need to discard. */ 322 skip = offset - server.repl_backlog_off; 323 redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip); 324 325 /* Point j to the oldest byte, that is actaully our 326 * server.repl_backlog_off byte. */ 327 j = (server.repl_backlog_idx + 328 (server.repl_backlog_size-server.repl_backlog_histlen)) % 329 server.repl_backlog_size; 330 redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j); 331 332 /* Discard the amount of data to seek to the specified 'offset'. */ 333 j = (j + skip) % server.repl_backlog_size; 334 335 /* Feed slave with data. Since it is a circular buffer we have to 336 * split the reply in two parts if we are cross-boundary. */ 337 len = server.repl_backlog_histlen - skip; 338 redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len); 339 while(len) { 340 long long thislen = 341 ((server.repl_backlog_size - j) < len) ? 342 (server.repl_backlog_size - j) : len; 343 344 redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen); 345 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); 346 len -= thislen; 347 j = 0; 348 } 349 return server.repl_backlog_histlen - skip; 350 } 351 352 /* This function handles the PSYNC command from the point of view of a 353 * master receiving a request for partial resynchronization. 354 * 355 * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed 356 * with the usual full resync. */ 357 int masterTryPartialResynchronization(redisClient *c) { 358 long long psync_offset, psync_len; 359 char *master_runid = c->argv[1]->ptr; 360 char buf[128]; 361 int buflen; 362 363 /* Is the runid of this master the same advertised by the wannabe slave 364 * via PSYNC? If runid changed this master is a different instance and 365 * there is no way to continue. */ 366 if (strcasecmp(master_runid, server.runid)) { 367 /* Run id "?" is used by slaves that want to force a full resync. */ 368 if (master_runid[0] != '?') { 369 redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " 370 "Runid mismatch (Client asked for runid '%s', my runid is '%s')", 371 master_runid, server.runid); 372 } else { 373 redisLog(REDIS_NOTICE,"Full resync requested by slave %s", 374 replicationGetSlaveName(c)); 375 } 376 goto need_full_resync; 377 } 378 379 /* We still have the data our slave is asking for? */ 380 if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != 381 REDIS_OK) goto need_full_resync; 382 if (!server.repl_backlog || 383 psync_offset < server.repl_backlog_off || 384 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) 385 { 386 redisLog(REDIS_NOTICE, 387 "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); 388 if (psync_offset > server.master_repl_offset) { 389 redisLog(REDIS_WARNING, 390 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); 391 } 392 goto need_full_resync; 393 } 394 395 /* If we reached this point, we are able to perform a partial resync: 396 * 1) Set client state to make it a slave. 397 * 2) Inform the client we can continue with +CONTINUE 398 * 3) Send the backlog data (from the offset to the end) to the slave. */ 399 c->flags |= REDIS_SLAVE; 400 c->replstate = REDIS_REPL_ONLINE; 401 c->repl_ack_time = server.unixtime; 402 c->repl_put_online_on_ack = 0; 403 listAddNodeTail(server.slaves,c); 404 /* We can't use the connection buffers since they are used to accumulate 405 * new commands at this stage. But we are sure the socket send buffer is 406 * empty so this write will never fail actually. */ 407 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); 408 if (write(c->fd,buf,buflen) != buflen) { 409 freeClientAsync(c); 410 return REDIS_OK; 411 } 412 psync_len = addReplyReplicationBacklog(c,psync_offset); 413 redisLog(REDIS_NOTICE, 414 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", 415 replicationGetSlaveName(c), 416 psync_len, psync_offset); 417 /* Note that we don't need to set the selected DB at server.slaveseldb 418 * to -1 to force the master to emit SELECT, since the slave already 419 * has this state from the previous connection with the master. */ 420 421 refreshGoodSlavesCount(); 422 return REDIS_OK; /* The caller can return, no full resync needed. */ 423 424 need_full_resync: 425 /* We need a full resync for some reason... notify the client. */ 426 psync_offset = server.master_repl_offset; 427 /* Add 1 to psync_offset if it the replication backlog does not exists 428 * as when it will be created later we'll increment the offset by one. */ 429 if (server.repl_backlog == NULL) psync_offset++; 430 /* Again, we can't use the connection buffers (see above). */ 431 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", 432 server.runid,psync_offset); 433 if (write(c->fd,buf,buflen) != buflen) { 434 freeClientAsync(c); 435 return REDIS_OK; 436 } 437 return REDIS_ERR; 438 } 439 440 /* Start a BGSAVE for replication goals, which is, selecting the disk or 441 * socket target depending on the configuration, and making sure that 442 * the script cache is flushed before to start. 443 * 444 * Returns REDIS_OK on success or REDIS_ERR otherwise. */ 445 int startBgsaveForReplication(void) { 446 int retval; 447 448 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", 449 server.repl_diskless_sync ? "slaves sockets" : "disk"); 450 451 if (server.repl_diskless_sync) 452 retval = rdbSaveToSlavesSockets(); 453 else 454 retval = rdbSaveBackground(server.rdb_filename); 455 456 /* Flush the script cache, since we need that slave differences are 457 * accumulated without requiring slaves to match our cached scripts. */ 458 if (retval == REDIS_OK) replicationScriptCacheFlush(); 459 return retval; 460 } 461 462 /* SYNC and PSYNC command implemenation. */ 463 void syncCommand(redisClient *c) { 464 /* ignore SYNC if already slave or in monitor mode */ 465 if (c->flags & REDIS_SLAVE) return; 466 467 /* Refuse SYNC requests if we are a slave but the link with our master 468 * is not ok... */ 469 if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { 470 addReplyError(c,"Can't SYNC while not connected with my master"); 471 return; 472 } 473 474 /* SYNC can't be issued when the server has pending data to send to 475 * the client about already issued commands. We need a fresh reply 476 * buffer registering the differences between the BGSAVE and the current 477 * dataset, so that we can copy to other slaves if needed. */ 478 if (listLength(c->reply) != 0 || c->bufpos != 0) { 479 addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 480 return; 481 } 482 483 redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", 484 replicationGetSlaveName(c)); 485 486 /* Try a partial resynchronization if this is a PSYNC command. 487 * If it fails, we continue with usual full resynchronization, however 488 * when this happens masterTryPartialResynchronization() already 489 * replied with: 490 * 491 * +FULLRESYNC <runid> <offset> 492 * 493 * So the slave knows the new runid and offset to try a PSYNC later 494 * if the connection with the master is lost. */ 495 if (!strcasecmp(c->argv[0]->ptr,"psync")) { 496 if (masterTryPartialResynchronization(c) == REDIS_OK) { 497 server.stat_sync_partial_ok++; 498 return; /* No full resync needed, return. */ 499 } else { 500 char *master_runid = c->argv[1]->ptr; 501 502 /* Increment stats for failed PSYNCs, but only if the 503 * runid is not "?", as this is used by slaves to force a full 504 * resync on purpose when they are not albe to partially 505 * resync. */ 506 if (master_runid[0] != '?') server.stat_sync_partial_err++; 507 } 508 } else { 509 /* If a slave uses SYNC, we are dealing with an old implementation 510 * of the replication protocol (like redis-cli --slave). Flag the client 511 * so that we don't expect to receive REPLCONF ACK feedbacks. */ 512 c->flags |= REDIS_PRE_PSYNC; 513 } 514 515 /* Full resynchronization. */ 516 server.stat_sync_full++; 517 518 /* Here we need to check if there is a background saving operation 519 * in progress, or if it is required to start one */ 520 if (server.rdb_child_pid != -1 && 521 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) 522 { 523 /* Ok a background save is in progress. Let's check if it is a good 524 * one for replication, i.e. if there is another slave that is 525 * registering differences since the server forked to save. */ 526 redisClient *slave; 527 listNode *ln; 528 listIter li; 529 530 listRewind(server.slaves,&li); 531 while((ln = listNext(&li))) { 532 slave = ln->value; 533 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; 534 } 535 if (ln) { 536 /* Perfect, the server is already registering differences for 537 * another slave. Set the right state, and copy the buffer. */ 538 copyClientOutputBuffer(c,slave); 539 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 540 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); 541 } else { 542 /* No way, we need to wait for the next BGSAVE in order to 543 * register differences. */ 544 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 545 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 546 } 547 } else if (server.rdb_child_pid != -1 && 548 server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) 549 { 550 /* There is an RDB child process but it is writing directly to 551 * children sockets. We need to wait for the next BGSAVE 552 * in order to synchronize. */ 553 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 554 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); 555 } else { 556 if (server.repl_diskless_sync) { 557 /* Diskless replication RDB child is created inside 558 * replicationCron() since we want to delay its start a 559 * few seconds to wait for more slaves to arrive. */ 560 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 561 if (server.repl_diskless_sync_delay) 562 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); 563 } else { 564 /* Ok we don't have a BGSAVE in progress, let's start one. */ 565 if (startBgsaveForReplication() != REDIS_OK) { 566 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); 567 addReplyError(c,"Unable to perform background save"); 568 return; 569 } 570 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; 571 } 572 } 573 574 if (server.repl_disable_tcp_nodelay) 575 anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ 576 c->repldbfd = -1; 577 c->flags |= REDIS_SLAVE; 578 server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ 579 listAddNodeTail(server.slaves,c); 580 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) 581 createReplicationBacklog(); 582 return; 583 } 584 585 /* REPLCONF <option> <value> <option> <value> ... 586 * This command is used by a slave in order to configure the replication 587 * process before starting it with the SYNC command. 588 * 589 * Currently the only use of this command is to communicate to the master 590 * what is the listening port of the Slave redis instance, so that the 591 * master can accurately list slaves and their listening ports in 592 * the INFO output. 593 * 594 * In the future the same command can be used in order to configure 595 * the replication to initiate an incremental replication instead of a 596 * full resync. */ 597 void replconfCommand(redisClient *c) { 598 int j; 599 600 if ((c->argc % 2) == 0) { 601 /* Number of arguments must be odd to make sure that every 602 * option has a corresponding value. */ 603 addReply(c,shared.syntaxerr); 604 return; 605 } 606 607 /* Process every option-value pair. */ 608 for (j = 1; j < c->argc; j+=2) { 609 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { 610 long port; 611 612 if ((getLongFromObjectOrReply(c,c->argv[j+1], 613 &port,NULL) != REDIS_OK)) 614 return; 615 c->slave_listening_port = port; 616 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { 617 /* REPLCONF ACK is used by slave to inform the master the amount 618 * of replication stream that it processed so far. It is an 619 * internal only command that normal clients should never use. */ 620 long long offset; 621 622 if (!(c->flags & REDIS_SLAVE)) return; 623 if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK)) 624 return; 625 if (offset > c->repl_ack_off) 626 c->repl_ack_off = offset; 627 c->repl_ack_time = server.unixtime; 628 /* If this was a diskless replication, we need to really put 629 * the slave online when the first ACK is received (which 630 * confirms slave is online and ready to get more data). */ 631 if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE) 632 putSlaveOnline(c); 633 /* Note: this command does not reply anything! */ 634 return; 635 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { 636 /* REPLCONF GETACK is used in order to request an ACK ASAP 637 * to the slave. */ 638 if (server.masterhost && server.master) replicationSendAck(); 639 /* Note: this command does not reply anything! */ 640 } else { 641 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", 642 (char*)c->argv[j]->ptr); 643 return; 644 } 645 } 646 addReply(c,shared.ok); 647 } 648 649 /* This function puts a slave in the online state, and should be called just 650 * after a slave received the RDB file for the initial synchronization, and 651 * we are finally ready to send the incremental stream of commands. 652 * 653 * It does a few things: 654 * 655 * 1) Put the slave in ONLINE state. 656 * 2) Make sure the writable event is re-installed, since calling the SYNC 657 * command disables it, so that we can accumulate output buffer without 658 * sending it to the slave. 659 * 3) Update the count of good slaves. */ 660 void putSlaveOnline(redisClient *slave) { 661 slave->replstate = REDIS_REPL_ONLINE; 662 slave->repl_put_online_on_ack = 0; 663 slave->repl_ack_time = server.unixtime; 664 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 665 sendReplyToClient, slave) == AE_ERR) { 666 redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); 667 freeClient(slave); 668 return; 669 } 670 refreshGoodSlavesCount(); 671 redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded", 672 replicationGetSlaveName(slave)); 673 } 674 675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { 676 redisClient *slave = privdata; 677 REDIS_NOTUSED(el); 678 REDIS_NOTUSED(mask); 679 char buf[REDIS_IOBUF_LEN]; 680 ssize_t nwritten, buflen; 681 682 /* Before sending the RDB file, we send the preamble as configured by the 683 * replication process. Currently the preamble is just the bulk count of 684 * the file in the form "$<length>\r\n". */ 685 if (slave->replpreamble) { 686 nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); 687 if (nwritten == -1) { 688 redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", 689 strerror(errno)); 690 freeClient(slave); 691 return; 692 } 693 sdsrange(slave->replpreamble,nwritten,-1); 694 if (sdslen(slave->replpreamble) == 0) { 695 sdsfree(slave->replpreamble); 696 slave->replpreamble = NULL; 697 /* fall through sending data. */ 698 } else { 699 return; 700 } 701 } 702 703 /* If the preamble was already transfered, send the RDB bulk data. */ 704 lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 705 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 706 if (buflen <= 0) { 707 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", 708 (buflen == 0) ? "premature EOF" : strerror(errno)); 709 freeClient(slave); 710 return; 711 } 712 if ((nwritten = write(fd,buf,buflen)) == -1) { 713 if (errno != EAGAIN) { 714 redisLog(REDIS_WARNING,"Write error sending DB to slave: %s", 715 strerror(errno)); 716 freeClient(slave); 717 } 718 return; 719 } 720 slave->repldboff += nwritten; 721 if (slave->repldboff == slave->repldbsize) { 722 close(slave->repldbfd); 723 slave->repldbfd = -1; 724 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 725 putSlaveOnline(slave); 726 } 727 } 728 729 /* This function is called at the end of every background saving, 730 * or when the replication RDB transfer strategy is modified from 731 * disk to socket or the other way around. 732 * 733 * The goal of this function is to handle slaves waiting for a successful 734 * background saving in order to perform non-blocking synchronization, and 735 * to schedule a new BGSAVE if there are slaves that attached while a 736 * BGSAVE was in progress, but it was not a good one for replication (no 737 * other slave was accumulating differences). 738 * 739 * The argument bgsaveerr is REDIS_OK if the background saving succeeded 740 * otherwise REDIS_ERR is passed to the function. 741 * The 'type' argument is the type of the child that terminated 742 * (if it had a disk or socket target). */ 743 void updateSlavesWaitingBgsave(int bgsaveerr, int type) { 744 listNode *ln; 745 int startbgsave = 0; 746 listIter li; 747 748 listRewind(server.slaves,&li); 749 while((ln = listNext(&li))) { 750 redisClient *slave = ln->value; 751 752 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 753 startbgsave = 1; 754 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 755 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { 756 struct redis_stat buf; 757 758 /* If this was an RDB on disk save, we have to prepare to send 759 * the RDB from disk to the slave socket. Otherwise if this was 760 * already an RDB -> Slaves socket transfer, used in the case of 761 * diskless replication, our work is trivial, we can just put 762 * the slave online. */ 763 if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { 764 redisLog(REDIS_NOTICE, 765 "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", 766 replicationGetSlaveName(slave)); 767 /* Note: we wait for a REPLCONF ACK message from slave in 768 * order to really put it online (install the write handler 769 * so that the accumulated data can be transfered). However 770 * we change the replication state ASAP, since our slave 771 * is technically online now. */ 772 slave->replstate = REDIS_REPL_ONLINE; 773 slave->repl_put_online_on_ack = 1; 774 } else { 775 if (bgsaveerr != REDIS_OK) { 776 freeClient(slave); 777 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); 778 continue; 779 } 780 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || 781 redis_fstat(slave->repldbfd,&buf) == -1) { 782 freeClient(slave); 783 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); 784 continue; 785 } 786 slave->repldboff = 0; 787 slave->repldbsize = buf.st_size; 788 slave->replstate = REDIS_REPL_SEND_BULK; 789 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", 790 (unsigned long long) slave->repldbsize); 791 792 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); 793 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { 794 freeClient(slave); 795 continue; 796 } 797 } 798 } 799 } 800 if (startbgsave) { 801 if (startBgsaveForReplication() != REDIS_OK) { 802 listIter li; 803 804 listRewind(server.slaves,&li); 805 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); 806 while((ln = listNext(&li))) { 807 redisClient *slave = ln->value; 808 809 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 810 freeClient(slave); 811 } 812 } 813 } 814 } 815 816 /* ----------------------------------- SLAVE -------------------------------- */ 817 818 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 819 void replicationAbortSyncTransfer(void) { 820 redisAssert(server.repl_state == REDIS_REPL_TRANSFER); 821 822 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 823 close(server.repl_transfer_s); 824 close(server.repl_transfer_fd); 825 unlink(server.repl_transfer_tmpfile); 826 zfree(server.repl_transfer_tmpfile); 827 server.repl_state = REDIS_REPL_CONNECT; 828 } 829 830 /* Avoid the master to detect the slave is timing out while loading the 831 * RDB file in initial synchronization. We send a single newline character 832 * that is valid protocol but is guaranteed to either be sent entierly or 833 * not, since the byte is indivisible. 834 * 835 * The function is called in two contexts: while we flush the current 836 * data with emptyDb(), and while we load the new data received as an 837 * RDB file from the master. */ 838 void replicationSendNewlineToMaster(void) { 839 static time_t newline_sent; 840 if (time(NULL) != newline_sent) { 841 newline_sent = time(NULL); 842 if (write(server.repl_transfer_s,"\n",1) == -1) { 843 /* Pinging back in this stage is best-effort. */ 844 } 845 } 846 } 847 848 /* Callback used by emptyDb() while flushing away old data to load 849 * the new dataset received by the master. */ 850 void replicationEmptyDbCallback(void *privdata) { 851 REDIS_NOTUSED(privdata); 852 replicationSendNewlineToMaster(); 853 } 854 855 /* Asynchronously read the SYNC payload we receive from a master */ 856 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ 857 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { 858 char buf[4096]; 859 ssize_t nread, readlen; 860 off_t left; 861 REDIS_NOTUSED(el); 862 REDIS_NOTUSED(privdata); 863 REDIS_NOTUSED(mask); 864 865 /* Static vars used to hold the EOF mark, and the last bytes received 866 * form the server: when they match, we reached the end of the transfer. */ 867 static char eofmark[REDIS_RUN_ID_SIZE]; 868 static char lastbytes[REDIS_RUN_ID_SIZE]; 869 static int usemark = 0; 870 871 /* If repl_transfer_size == -1 we still have to read the bulk length 872 * from the master reply. */ 873 if (server.repl_transfer_size == -1) { 874 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { 875 redisLog(REDIS_WARNING, 876 "I/O error reading bulk count from MASTER: %s", 877 strerror(errno)); 878 goto error; 879 } 880 881 if (buf[0] == '-') { 882 redisLog(REDIS_WARNING, 883 "MASTER aborted replication with an error: %s", 884 buf+1); 885 goto error; 886 } else if (buf[0] == '\0') { 887 /* At this stage just a newline works as a PING in order to take 888 * the connection live. So we refresh our last interaction 889 * timestamp. */ 890 server.repl_transfer_lastio = server.unixtime; 891 return; 892 } else if (buf[0] != '$') { 893 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); 894 goto error; 895 } 896 897 /* There are two possible forms for the bulk payload. One is the 898 * usual $<count> bulk format. The other is used for diskless transfers 899 * when the master does not know beforehand the size of the file to 900 * transfer. In the latter case, the following format is used: 901 * 902 * $EOF:<40 bytes delimiter> 903 * 904 * At the end of the file the announced delimiter is transmitted. The 905 * delimiter is long and random enough that the probability of a 906 * collision with the actual file content can be ignored. */ 907 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { 908 usemark = 1; 909 memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); 910 memset(lastbytes,0,REDIS_RUN_ID_SIZE); 911 /* Set any repl_transfer_size to avoid entering this code path 912 * at the next call. */ 913 server.repl_transfer_size = 0; 914 redisLog(REDIS_NOTICE, 915 "MASTER <-> SLAVE sync: receiving streamed RDB from master"); 916 } else { 917 usemark = 0; 918 server.repl_transfer_size = strtol(buf+1,NULL,10); 919 redisLog(REDIS_NOTICE, 920 "MASTER <-> SLAVE sync: receiving %lld bytes from master", 921 (long long) server.repl_transfer_size); 922 } 923 return; 924 } 925 926 /* Read bulk data */ 927 if (usemark) { 928 readlen = sizeof(buf); 929 } else { 930 left = server.repl_transfer_size - server.repl_transfer_read; 931 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); 932 } 933 934 nread = read(fd,buf,readlen); 935 if (nread <= 0) { 936 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", 937 (nread == -1) ? strerror(errno) : "connection lost"); 938 replicationAbortSyncTransfer(); 939 return; 940 } 941 942 /* When a mark is used, we want to detect EOF asap in order to avoid 943 * writing the EOF mark into the file... */ 944 int eof_reached = 0; 945 946 if (usemark) { 947 /* Update the last bytes array, and check if it matches our delimiter.*/ 948 if (nread >= REDIS_RUN_ID_SIZE) { 949 memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); 950 } else { 951 int rem = REDIS_RUN_ID_SIZE-nread; 952 memmove(lastbytes,lastbytes+nread,rem); 953 memcpy(lastbytes+rem,buf,nread); 954 } 955 if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; 956 } 957 958 server.repl_transfer_lastio = server.unixtime; 959 if (write(server.repl_transfer_fd,buf,nread) != nread) { 960 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); 961 goto error; 962 } 963 server.repl_transfer_read += nread; 964 965 /* Delete the last 40 bytes from the file if we reached EOF. */ 966 if (usemark && eof_reached) { 967 if (ftruncate(server.repl_transfer_fd, 968 server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1) 969 { 970 redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); 971 goto error; 972 } 973 } 974 975 /* Sync data on disk from time to time, otherwise at the end of the transfer 976 * we may suffer a big delay as the memory buffers are copied into the 977 * actual disk. */ 978 if (server.repl_transfer_read >= 979 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) 980 { 981 off_t sync_size = server.repl_transfer_read - 982 server.repl_transfer_last_fsync_off; 983 rdb_fsync_range(server.repl_transfer_fd, 984 server.repl_transfer_last_fsync_off, sync_size); 985 server.repl_transfer_last_fsync_off += sync_size; 986 } 987 988 /* Check if the transfer is now complete */ 989 if (!usemark) { 990 if (server.repl_transfer_read == server.repl_transfer_size) 991 eof_reached = 1; 992 } 993 994 if (eof_reached) { 995 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { 996 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); 997 replicationAbortSyncTransfer(); 998 return; 999 } 1000 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); 1001 signalFlushedDb(-1); 1002 emptyDb(replicationEmptyDbCallback); 1003 /* Before loading the DB into memory we need to delete the readable 1004 * handler, otherwise it will get called recursively since 1005 * rdbLoad() will call the event loop to process events from time to 1006 * time for non blocking loading. */ 1007 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); 1008 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); 1009 if (rdbLoad(server.rdb_filename) != REDIS_OK) { 1010 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); 1011 replicationAbortSyncTransfer(); 1012 return; 1013 } 1014 /* Final setup of the connected slave <- master link */ 1015 zfree(server.repl_transfer_tmpfile); 1016 close(server.repl_transfer_fd); 1017 server.master = createClient(server.repl_transfer_s); 1018 server.master->flags |= REDIS_MASTER; 1019 server.master->authenticated = 1; 1020 server.repl_state = REDIS_REPL_CONNECTED; 1021 server.master->reploff = server.repl_master_initial_offset; 1022 memcpy(server.master->replrunid, server.repl_master_runid, 1023 sizeof(server.repl_master_runid)); 1024 /* If master offset is set to -1, this master is old and is not 1025 * PSYNC capable, so we flag it accordingly. */ 1026 if (server.master->reploff == -1) 1027 server.master->flags |= REDIS_PRE_PSYNC; 1028 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); 1029 /* Restart the AOF subsystem now that we finished the sync. This 1030 * will trigger an AOF rewrite, and when done will start appending 1031 * to the new file. */ 1032 if (server.aof_state != REDIS_AOF_OFF) { 1033 int retry = 10; 1034 1035 stopAppendOnly(); 1036 while (retry-- && startAppendOnly() == REDIS_ERR) { 1037 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); 1038 sleep(1); 1039 } 1040 if (!retry) { 1041 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); 1042 exit(1); 1043 } 1044 } 1045 } 1046 1047 return; 1048 1049 error: 1050 replicationAbortSyncTransfer(); 1051 return; 1052 } 1053 1054 /* Send a synchronous command to the master. Used to send AUTH and 1055 * REPLCONF commands before starting the replication with SYNC. 1056 * 1057 * The command returns an sds string representing the result of the 1058 * operation. On error the first byte is a "-". 1059 */ 1060 char *sendSynchronousCommand(int fd, ...) { 1061 va_list ap; 1062 sds cmd = sdsempty(); 1063 char *arg, buf[256]; 1064 1065 /* Create the command to send to the master, we use simple inline 1066 * protocol for simplicity as currently we only send simple strings. */ 1067 va_start(ap,fd); 1068 while(1) { 1069 arg = va_arg(ap, char*); 1070 if (arg == NULL) break; 1071 1072 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); 1073 cmd = sdscat(cmd,arg); 1074 } 1075 cmd = sdscatlen(cmd,"\r\n",2); 1076 1077 /* Transfer command to the server. */ 1078 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { 1079 sdsfree(cmd); 1080 return sdscatprintf(sdsempty(),"-Writing to master: %s", 1081 strerror(errno)); 1082 } 1083 sdsfree(cmd); 1084 1085 /* Read the reply from the server. */ 1086 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) 1087 { 1088 return sdscatprintf(sdsempty(),"-Reading from master: %s", 1089 strerror(errno)); 1090 } 1091 return sdsnew(buf); 1092 } 1093 1094 /* Try a partial resynchronization with the master if we are about to reconnect. 1095 * If there is no cached master structure, at least try to issue a 1096 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC 1097 * command in order to obtain the master run id and the master replication 1098 * global offset. 1099 * 1100 * This function is designed to be called from syncWithMaster(), so the 1101 * following assumptions are made: 1102 * 1103 * 1) We pass the function an already connected socket "fd". 1104 * 2) This function does not close the file descriptor "fd". However in case 1105 * of successful partial resynchronization, the function will reuse 1106 * 'fd' as file descriptor of the server.master client structure. 1107 * 1108 * The function returns: 1109 * 1110 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue. 1111 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. 1112 * In this case the master run_id and global replication 1113 * offset is saved. 1114 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and 1115 * the caller should fall back to SYNC. 1116 */ 1117 1118 #define PSYNC_CONTINUE 0 1119 #define PSYNC_FULLRESYNC 1 1120 #define PSYNC_NOT_SUPPORTED 2 1121 int slaveTryPartialResynchronization(int fd) { 1122 char *psync_runid; 1123 char psync_offset[32]; 1124 sds reply; 1125 1126 /* Initially set repl_master_initial_offset to -1 to mark the current 1127 * master run_id and offset as not valid. Later if we'll be able to do 1128 * a FULL resync using the PSYNC command we'll set the offset at the 1129 * right value, so that this information will be propagated to the 1130 * client structure representing the master into server.master. */ 1131 server.repl_master_initial_offset = -1; 1132 1133 if (server.cached_master) { 1134 psync_runid = server.cached_master->replrunid; 1135 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 1136 redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); 1137 } else { 1138 redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); 1139 psync_runid = "?"; 1140 memcpy(psync_offset,"-1",3); 1141 } 1142 1143 /* Issue the PSYNC command */ 1144 reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); 1145 1146 if (!strncmp(reply,"+FULLRESYNC",11)) { 1147 char *runid = NULL, *offset = NULL; 1148 1149 /* FULL RESYNC, parse the reply in order to extract the run id 1150 * and the replication offset. */ 1151 runid = strchr(reply,' '); 1152 if (runid) { 1153 runid++; 1154 offset = strchr(runid,' '); 1155 if (offset) offset++; 1156 } 1157 if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { 1158 redisLog(REDIS_WARNING, 1159 "Master replied with wrong +FULLRESYNC syntax."); 1160 /* This is an unexpected condition, actually the +FULLRESYNC 1161 * reply means that the master supports PSYNC, but the reply 1162 * format seems wrong. To stay safe we blank the master 1163 * runid to make sure next PSYNCs will fail. */ 1164 memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); 1165 } else { 1166 memcpy(server.repl_master_runid, runid, offset-runid-1); 1167 server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; 1168 server.repl_master_initial_offset = strtoll(offset,NULL,10); 1169 redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", 1170 server.repl_master_runid, 1171 server.repl_master_initial_offset); 1172 } 1173 /* We are going to full resync, discard the cached master structure. */ 1174 replicationDiscardCachedMaster(); 1175 sdsfree(reply); 1176 return PSYNC_FULLRESYNC; 1177 } 1178 1179 if (!strncmp(reply,"+CONTINUE",9)) { 1180 /* Partial resync was accepted, set the replication state accordingly */ 1181 redisLog(REDIS_NOTICE, 1182 "Successful partial resynchronization with master."); 1183 sdsfree(reply); 1184 replicationResurrectCachedMaster(fd); 1185 return PSYNC_CONTINUE; 1186 } 1187 1188 /* If we reach this point we receied either an error since the master does 1189 * not understand PSYNC, or an unexpected reply from the master. 1190 * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */ 1191 1192 if (strncmp(reply,"-ERR",4)) { 1193 /* If it's not an error, log the unexpected event. */ 1194 redisLog(REDIS_WARNING, 1195 "Unexpected reply to PSYNC from master: %s", reply); 1196 } else { 1197 redisLog(REDIS_NOTICE, 1198 "Master does not support PSYNC or is in " 1199 "error state (reply: %s)", reply); 1200 } 1201 sdsfree(reply); 1202 replicationDiscardCachedMaster(); 1203 return PSYNC_NOT_SUPPORTED; 1204 } 1205 1206 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { 1207 char tmpfile[256], *err; 1208 int dfd, maxtries = 5; 1209 int sockerr = 0, psync_result; 1210 socklen_t errlen = sizeof(sockerr); 1211 REDIS_NOTUSED(el); 1212 REDIS_NOTUSED(privdata); 1213 REDIS_NOTUSED(mask); 1214 1215 /* If this event fired after the user turned the instance into a master 1216 * with SLAVEOF NO ONE we must just return ASAP. */ 1217 if (server.repl_state == REDIS_REPL_NONE) { 1218 close(fd); 1219 return; 1220 } 1221 1222 /* Check for errors in the socket. */ 1223 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) 1224 sockerr = errno; 1225 if (sockerr) { 1226 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1227 redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", 1228 strerror(sockerr)); 1229 goto error; 1230 } 1231 1232 /* If we were connecting, it's time to send a non blocking PING, we want to 1233 * make sure the master is able to reply before going into the actual 1234 * replication process where we have long timeouts in the order of 1235 * seconds (in the meantime the slave would block). */ 1236 if (server.repl_state == REDIS_REPL_CONNECTING) { 1237 redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); 1238 /* Delete the writable event so that the readable event remains 1239 * registered and we can wait for the PONG reply. */ 1240 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); 1241 server.repl_state = REDIS_REPL_RECEIVE_PONG; 1242 /* Send the PING, don't check for errors at all, we have the timeout 1243 * that will take care about this. */ 1244 syncWrite(fd,"PING\r\n",6,100); 1245 return; 1246 } 1247 1248 /* Receive the PONG command. */ 1249 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { 1250 char buf[1024]; 1251 1252 /* Delete the readable event, we no longer need it now that there is 1253 * the PING reply to read. */ 1254 aeDeleteFileEvent(server.el,fd,AE_READABLE); 1255 1256 /* Read the reply with explicit timeout. */ 1257 buf[0] = '\0'; 1258 if (syncReadLine(fd,buf,sizeof(buf), 1259 server.repl_syncio_timeout*1000) == -1) 1260 { 1261 redisLog(REDIS_WARNING, 1262 "I/O error reading PING reply from master: %s", 1263 strerror(errno)); 1264 goto error; 1265 } 1266 1267 /* We accept only two replies as valid, a positive +PONG reply 1268 * (we just check for "+") or an authentication error. 1269 * Note that older versions of Redis replied with "operation not 1270 * permitted" instead of using a proper error code, so we test 1271 * both. */ 1272 if (buf[0] != '+' && 1273 strncmp(buf,"-NOAUTH",7) != 0 && 1274 strncmp(buf,"-ERR operation not permitted",28) != 0) 1275 { 1276 redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); 1277 goto error; 1278 } else { 1279 redisLog(REDIS_NOTICE, 1280 "Master replied to PING, replication can continue..."); 1281 } 1282 } 1283 1284 /* AUTH with the master if required. */ 1285 if(server.masterauth) { 1286 err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); 1287 if (err[0] == '-') { 1288 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); 1289 sdsfree(err); 1290 goto error; 1291 } 1292 sdsfree(err); 1293 } 1294 1295 /* Set the slave port, so that Master's INFO command can list the 1296 * slave listening port correctly. */ 1297 { 1298 sds port = sdsfromlonglong(server.port); 1299 err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, 1300 NULL); 1301 sdsfree(port); 1302 /* Ignore the error if any, not all the Redis versions support 1303 * REPLCONF listening-port. */ 1304 if (err[0] == '-') { 1305 redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); 1306 } 1307 sdsfree(err); 1308 } 1309 1310 /* Try a partial resynchonization. If we don't have a cached master 1311 * slaveTryPartialResynchronization() will at least try to use PSYNC 1312 * to start a full resynchronization so that we get the master run id 1313 * and the global offset, to try a partial resync at the next 1314 * reconnection attempt. */ 1315 psync_result = slaveTryPartialResynchronization(fd); 1316 if (psync_result == PSYNC_CONTINUE) { 1317 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); 1318 return; 1319 } 1320 1321 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC 1322 * and the server.repl_master_runid and repl_master_initial_offset are 1323 * already populated. */ 1324 if (psync_result == PSYNC_NOT_SUPPORTED) { 1325 redisLog(REDIS_NOTICE,"Retrying with SYNC..."); 1326 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 1327 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", 1328 strerror(errno)); 1329 goto error; 1330 } 1331 } 1332 1333 /* Prepare a suitable temp file for bulk transfer */ 1334 while(maxtries--) { 1335 snprintf(tmpfile,256, 1336 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 1337 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 1338 if (dfd != -1) break; 1339 sleep(1); 1340 } 1341 if (dfd == -1) { 1342 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); 1343 goto error; 1344 } 1345 1346 /* Setup the non blocking download of the bulk file. */ 1347 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) 1348 == AE_ERR) 1349 { 1350 redisLog(REDIS_WARNING, 1351 "Can't create readable event for SYNC: %s (fd=%d)", 1352 strerror(errno),fd); 1353 goto error; 1354 } 1355 1356 server.repl_state = REDIS_REPL_TRANSFER; 1357 server.repl_transfer_size = -1; 1358 server.repl_transfer_read = 0; 1359 server.repl_transfer_last_fsync_off = 0; 1360 server.repl_transfer_fd = dfd; 1361 server.repl_transfer_lastio = server.unixtime; 1362 server.repl_transfer_tmpfile = zstrdup(tmpfile); 1363 return; 1364 1365 error: 1366 close(fd); 1367 server.repl_transfer_s = -1; 1368 server.repl_state = REDIS_REPL_CONNECT; 1369 return; 1370 } 1371 1372 int connectWithMaster(void) { 1373 int fd; 1374 1375 fd = anetTcpNonBlockBindConnect(NULL, 1376 server.masterhost,server.masterport,REDIS_BIND_ADDR); 1377 if (fd == -1) { 1378 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", 1379 strerror(errno)); 1380 return REDIS_ERR; 1381 } 1382 1383 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == 1384 AE_ERR) 1385 { 1386 close(fd); 1387 redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); 1388 return REDIS_ERR; 1389 } 1390 1391 server.repl_transfer_lastio = server.unixtime; 1392 server.repl_transfer_s = fd; 1393 server.repl_state = REDIS_REPL_CONNECTING; 1394 return REDIS_OK; 1395 } 1396 1397 /* This function can be called when a non blocking connection is currently 1398 * in progress to undo it. */ 1399 void undoConnectWithMaster(void) { 1400 int fd = server.repl_transfer_s; 1401 1402 redisAssert(server.repl_state == REDIS_REPL_CONNECTING || 1403 server.repl_state == REDIS_REPL_RECEIVE_PONG); 1404 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); 1405 close(fd); 1406 server.repl_transfer_s = -1; 1407 server.repl_state = REDIS_REPL_CONNECT; 1408 } 1409 1410 /* This function aborts a non blocking replication attempt if there is one 1411 * in progress, by canceling the non-blocking connect attempt or 1412 * the initial bulk transfer. 1413 * 1414 * If there was a replication handshake in progress 1 is returned and 1415 * the replication state (server.repl_state) set to REDIS_REPL_CONNECT. 1416 * 1417 * Otherwise zero is returned and no operation is perforemd at all. */ 1418 int cancelReplicationHandshake(void) { 1419 if (server.repl_state == REDIS_REPL_TRANSFER) { 1420 replicationAbortSyncTransfer(); 1421 } else if (server.repl_state == REDIS_REPL_CONNECTING || 1422 server.repl_state == REDIS_REPL_RECEIVE_PONG) 1423 { 1424 undoConnectWithMaster(); 1425 } else { 1426 return 0; 1427 } 1428 return 1; 1429 } 1430 1431 /* Set replication to the specified master address and port. */ 1432 void replicationSetMaster(char *ip, int port) { 1433 sdsfree(server.masterhost); 1434 server.masterhost = sdsnew(ip); 1435 server.masterport = port; 1436 if (server.master) freeClient(server.master); 1437 disconnectSlaves(); /* Force our slaves to resync with us as well. */ 1438 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ 1439 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ 1440 cancelReplicationHandshake(); 1441 server.repl_state = REDIS_REPL_CONNECT; 1442 server.master_repl_offset = 0; 1443 server.repl_down_since = 0; 1444 } 1445 1446 /* Cancel replication, setting the instance as a master itself. */ 1447 void replicationUnsetMaster(void) { 1448 if (server.masterhost == NULL) return; /* Nothing to do. */ 1449 sdsfree(server.masterhost); 1450 server.masterhost = NULL; 1451 if (server.master) { 1452 if (listLength(server.slaves) == 0) { 1453 /* If this instance is turned into a master and there are no 1454 * slaves, it inherits the replication offset from the master. 1455 * Under certain conditions this makes replicas comparable by 1456 * replication offset to understand what is the most updated. */ 1457 server.master_repl_offset = server.master->reploff; 1458 freeReplicationBacklog(); 1459 } 1460 freeClient(server.master); 1461 } 1462 replicationDiscardCachedMaster(); 1463 cancelReplicationHandshake(); 1464 server.repl_state = REDIS_REPL_NONE; 1465 } 1466 1467 void slaveofCommand(redisClient *c) { 1468 /* SLAVEOF is not allowed in cluster mode as replication is automatically 1469 * configured using the current address of the master node. */ 1470 if (server.cluster_enabled) { 1471 addReplyError(c,"SLAVEOF not allowed in cluster mode."); 1472 return; 1473 } 1474 1475 /* The special host/port combination "NO" "ONE" turns the instance 1476 * into a master. Otherwise the new master address is set. */ 1477 if (!strcasecmp(c->argv[1]->ptr,"no") && 1478 !strcasecmp(c->argv[2]->ptr,"one")) { 1479 if (server.masterhost) { 1480 replicationUnsetMaster(); 1481 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); 1482 } 1483 } else { 1484 long port; 1485 1486 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) 1487 return; 1488 1489 /* Check if we are already attached to the specified slave */ 1490 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) 1491 && server.masterport == port) { 1492 redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); 1493 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); 1494 return; 1495 } 1496 /* There was no previous master or the user specified a different one, 1497 * we can continue. */ 1498 replicationSetMaster(c->argv[1]->ptr, port); 1499 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", 1500 server.masterhost, server.masterport); 1501 } 1502 addReply(c,shared.ok); 1503 } 1504 1505 /* ROLE command: provide information about the role of the instance 1506 * (master or slave) and additional information related to replication 1507 * in an easy to process format. */ 1508 void roleCommand(redisClient *c) { 1509 if (server.masterhost == NULL) { 1510 listIter li; 1511 listNode *ln; 1512 void *mbcount; 1513 int slaves = 0; 1514 1515 addReplyMultiBulkLen(c,3); 1516 addReplyBulkCBuffer(c,"master",6); 1517 addReplyLongLong(c,server.master_repl_offset); 1518 mbcount = addDeferredMultiBulkLength(c); 1519 listRewind(server.slaves,&li); 1520 while((ln = listNext(&li))) { 1521 redisClient *slave = ln->value; 1522 char ip[REDIS_IP_STR_LEN]; 1523 1524 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue; 1525 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1526 addReplyMultiBulkLen(c,3); 1527 addReplyBulkCString(c,ip); 1528 addReplyBulkLongLong(c,slave->slave_listening_port); 1529 addReplyBulkLongLong(c,slave->repl_ack_off); 1530 slaves++; 1531 } 1532 setDeferredMultiBulkLength(c,mbcount,slaves); 1533 } else { 1534 char *slavestate = NULL; 1535 1536 addReplyMultiBulkLen(c,5); 1537 addReplyBulkCBuffer(c,"slave",5); 1538 addReplyBulkCString(c,server.masterhost); 1539 addReplyLongLong(c,server.masterport); 1540 switch(server.repl_state) { 1541 case REDIS_REPL_NONE: slavestate = "none"; break; 1542 case REDIS_REPL_CONNECT: slavestate = "connect"; break; 1543 case REDIS_REPL_CONNECTING: slavestate = "connecting"; break; 1544 case REDIS_REPL_RECEIVE_PONG: /* see next */ 1545 case REDIS_REPL_TRANSFER: slavestate = "sync"; break; 1546 case REDIS_REPL_CONNECTED: slavestate = "connected"; break; 1547 default: slavestate = "unknown"; break; 1548 } 1549 addReplyBulkCString(c,slavestate); 1550 addReplyLongLong(c,server.master ? server.master->reploff : -1); 1551 } 1552 } 1553 1554 /* Send a REPLCONF ACK command to the master to inform it about the current 1555 * processed offset. If we are not connected with a master, the command has 1556 * no effects. */ 1557 void replicationSendAck(void) { 1558 redisClient *c = server.master; 1559 1560 if (c != NULL) { 1561 c->flags |= REDIS_MASTER_FORCE_REPLY; 1562 addReplyMultiBulkLen(c,3); 1563 addReplyBulkCString(c,"REPLCONF"); 1564 addReplyBulkCString(c,"ACK"); 1565 addReplyBulkLongLong(c,c->reploff); 1566 c->flags &= ~REDIS_MASTER_FORCE_REPLY; 1567 } 1568 } 1569 1570 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ 1571 1572 /* In order to implement partial synchronization we need to be able to cache 1573 * our master's client structure after a transient disconnection. 1574 * It is cached into server.cached_master and flushed away using the following 1575 * functions. */ 1576 1577 /* This function is called by freeClient() in order to cache the master 1578 * client structure instead of destryoing it. freeClient() will return 1579 * ASAP after this function returns, so every action needed to avoid problems 1580 * with a client that is really "suspended" has to be done by this function. 1581 * 1582 * The other functions that will deal with the cached master are: 1583 * 1584 * replicationDiscardCachedMaster() that will make sure to kill the client 1585 * as for some reason we don't want to use it in the future. 1586 * 1587 * replicationResurrectCachedMaster() that is used after a successful PSYNC 1588 * handshake in order to reactivate the cached master. 1589 */ 1590 void replicationCacheMaster(redisClient *c) { 1591 listNode *ln; 1592 1593 redisAssert(server.master != NULL && server.cached_master == NULL); 1594 redisLog(REDIS_NOTICE,"Caching the disconnected master state."); 1595 1596 /* Remove from the list of clients, we don't want this client to be 1597 * listed by CLIENT LIST or processed in any way by batch operations. */ 1598 ln = listSearchKey(server.clients,c); 1599 redisAssert(ln != NULL); 1600 listDelNode(server.clients,ln); 1601 1602 /* Save the master. Server.master will be set to null later by 1603 * replicationHandleMasterDisconnection(). */ 1604 server.cached_master = server.master; 1605 1606 /* Remove the event handlers and close the socket. We'll later reuse 1607 * the socket of the new connection with the master during PSYNC. */ 1608 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); 1609 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 1610 close(c->fd); 1611 1612 /* Set fd to -1 so that we can safely call freeClient(c) later. */ 1613 c->fd = -1; 1614 1615 /* Invalidate the Peer ID cache. */ 1616 if (c->peerid) { 1617 sdsfree(c->peerid); 1618 c->peerid = NULL; 1619 } 1620 1621 /* Caching the master happens instead of the actual freeClient() call, 1622 * so make sure to adjust the replication state. This function will 1623 * also set server.master to NULL. */ 1624 replicationHandleMasterDisconnection(); 1625 } 1626 1627 /* Free a cached master, called when there are no longer the conditions for 1628 * a partial resync on reconnection. */ 1629 void replicationDiscardCachedMaster(void) { 1630 if (server.cached_master == NULL) return; 1631 1632 redisLog(REDIS_NOTICE,"Discarding previously cached master state."); 1633 server.cached_master->flags &= ~REDIS_MASTER; 1634 freeClient(server.cached_master); 1635 server.cached_master = NULL; 1636 } 1637 1638 /* Turn the cached master into the current master, using the file descriptor 1639 * passed as argument as the socket for the new master. 1640 * 1641 * This function is called when successfully setup a partial resynchronization 1642 * so the stream of data that we'll receive will start from were this 1643 * master left. */ 1644 void replicationResurrectCachedMaster(int newfd) { 1645 server.master = server.cached_master; 1646 server.cached_master = NULL; 1647 server.master->fd = newfd; 1648 server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); 1649 server.master->authenticated = 1; 1650 server.master->lastinteraction = server.unixtime; 1651 server.repl_state = REDIS_REPL_CONNECTED; 1652 1653 /* Re-add to the list of clients. */ 1654 listAddNodeTail(server.clients,server.master); 1655 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, 1656 readQueryFromClient, server.master)) { 1657 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); 1658 freeClientAsync(server.master); /* Close ASAP. */ 1659 } 1660 1661 /* We may also need to install the write handler as well if there is 1662 * pending data in the write buffers. */ 1663 if (server.master->bufpos || listLength(server.master->reply)) { 1664 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, 1665 sendReplyToClient, server.master)) { 1666 redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); 1667 freeClientAsync(server.master); /* Close ASAP. */ 1668 } 1669 } 1670 } 1671 1672 /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ 1673 1674 /* This function counts the number of slaves with lag <= min-slaves-max-lag. 1675 * If the option is active, the server will prevent writes if there are not 1676 * enough connected slaves with the specified lag (or less). */ 1677 void refreshGoodSlavesCount(void) { 1678 listIter li; 1679 listNode *ln; 1680 int good = 0; 1681 1682 if (!server.repl_min_slaves_to_write || 1683 !server.repl_min_slaves_max_lag) return; 1684 1685 listRewind(server.slaves,&li); 1686 while((ln = listNext(&li))) { 1687 redisClient *slave = ln->value; 1688 time_t lag = server.unixtime - slave->repl_ack_time; 1689 1690 if (slave->replstate == REDIS_REPL_ONLINE && 1691 lag <= server.repl_min_slaves_max_lag) good++; 1692 } 1693 server.repl_good_slaves_count = good; 1694 } 1695 1696 /* ----------------------- REPLICATION SCRIPT CACHE -------------------------- 1697 * The goal of this code is to keep track of scripts already sent to every 1698 * connected slave, in order to be able to replicate EVALSHA as it is without 1699 * translating it to EVAL every time it is possible. 1700 * 1701 * We use a capped collection implemented by a hash table for fast lookup 1702 * of scripts we can send as EVALSHA, plus a linked list that is used for 1703 * eviction of the oldest entry when the max number of items is reached. 1704 * 1705 * We don't care about taking a different cache for every different slave 1706 * since to fill the cache again is not very costly, the goal of this code 1707 * is to avoid that the same big script is trasmitted a big number of times 1708 * per second wasting bandwidth and processor speed, but it is not a problem 1709 * if we need to rebuild the cache from scratch from time to time, every used 1710 * script will need to be transmitted a single time to reappear in the cache. 1711 * 1712 * This is how the system works: 1713 * 1714 * 1) Every time a new slave connects, we flush the whole script cache. 1715 * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without 1716 * trying to convert EVAL into EVALSHA specifically for slaves. 1717 * 3) Every time we trasmit a script as EVAL to the slaves, we also add the 1718 * corresponding SHA1 of the script into the cache as we are sure every 1719 * slave knows about the script starting from now. 1720 * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves 1721 * and at the same time flush the script cache. 1722 * 5) When the last slave disconnects, flush the cache. 1723 * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded 1724 * in the master sometimes. 1725 */ 1726 1727 /* Initialize the script cache, only called at startup. */ 1728 void replicationScriptCacheInit(void) { 1729 server.repl_scriptcache_size = 10000; 1730 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); 1731 server.repl_scriptcache_fifo = listCreate(); 1732 } 1733 1734 /* Empty the script cache. Should be called every time we are no longer sure 1735 * that every slave knows about all the scripts in our set, or when the 1736 * current AOF "context" is no longer aware of the script. In general we 1737 * should flush the cache: 1738 * 1739 * 1) Every time a new slave reconnects to this master and performs a 1740 * full SYNC (PSYNC does not require flushing). 1741 * 2) Every time an AOF rewrite is performed. 1742 * 3) Every time we are left without slaves at all, and AOF is off, in order 1743 * to reclaim otherwise unused memory. 1744 */ 1745 void replicationScriptCacheFlush(void) { 1746 dictEmpty(server.repl_scriptcache_dict,NULL); 1747 listRelease(server.repl_scriptcache_fifo); 1748 server.repl_scriptcache_fifo = listCreate(); 1749 } 1750 1751 /* Add an entry into the script cache, if we reach max number of entries the 1752 * oldest is removed from the list. */ 1753 void replicationScriptCacheAdd(sds sha1) { 1754 int retval; 1755 sds key = sdsdup(sha1); 1756 1757 /* Evict oldest. */ 1758 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) 1759 { 1760 listNode *ln = listLast(server.repl_scriptcache_fifo); 1761 sds oldest = listNodeValue(ln); 1762 1763 retval = dictDelete(server.repl_scriptcache_dict,oldest); 1764 redisAssert(retval == DICT_OK); 1765 listDelNode(server.repl_scriptcache_fifo,ln); 1766 } 1767 1768 /* Add current. */ 1769 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); 1770 listAddNodeHead(server.repl_scriptcache_fifo,key); 1771 redisAssert(retval == DICT_OK); 1772 } 1773 1774 /* Returns non-zero if the specified entry exists inside the cache, that is, 1775 * if all the slaves are aware of this script SHA1. */ 1776 int replicationScriptCacheExists(sds sha1) { 1777 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; 1778 } 1779 1780 /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- 1781 * Redis synchronous replication design can be summarized in points: 1782 * 1783 * - Redis masters have a global replication offset, used by PSYNC. 1784 * - Master increment the offset every time new commands are sent to slaves. 1785 * - Slaves ping back masters with the offset processed so far. 1786 * 1787 * So synchronous replication adds a new WAIT command in the form: 1788 * 1789 * WAIT <num_replicas> <milliseconds_timeout> 1790 * 1791 * That returns the number of replicas that processed the query when 1792 * we finally have at least num_replicas, or when the timeout was 1793 * reached. 1794 * 1795 * The command is implemented in this way: 1796 * 1797 * - Every time a client processes a command, we remember the replication 1798 * offset after sending that command to the slaves. 1799 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. 1800 * The client is blocked at the same time (see blocked.c). 1801 * - Once we receive enough ACKs for a given offset or when the timeout 1802 * is reached, the WAIT command is unblocked and the reply sent to the 1803 * client. 1804 */ 1805 1806 /* This just set a flag so that we broadcast a REPLCONF GETACK command 1807 * to all the slaves in the beforeSleep() function. Note that this way 1808 * we "group" all the clients that want to wait for synchronouns replication 1809 * in a given event loop iteration, and send a single GETACK for them all. */ 1810 void replicationRequestAckFromSlaves(void) { 1811 server.get_ack_from_slaves = 1; 1812 } 1813 1814 /* Return the number of slaves that already acknowledged the specified 1815 * replication offset. */ 1816 int replicationCountAcksByOffset(long long offset) { 1817 listIter li; 1818 listNode *ln; 1819 int count = 0; 1820 1821 listRewind(server.slaves,&li); 1822 while((ln = listNext(&li))) { 1823 redisClient *slave = ln->value; 1824 1825 if (slave->replstate != REDIS_REPL_ONLINE) continue; 1826 if (slave->repl_ack_off >= offset) count++; 1827 } 1828 return count; 1829 } 1830 1831 /* WAIT for N replicas to acknowledge the processing of our latest 1832 * write command (and all the previous commands). */ 1833 void waitCommand(redisClient *c) { 1834 mstime_t timeout; 1835 long numreplicas, ackreplicas; 1836 long long offset = c->woff; 1837 1838 /* Argument parsing. */ 1839 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK) 1840 return; 1841 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) 1842 != REDIS_OK) return; 1843 1844 /* First try without blocking at all. */ 1845 ackreplicas = replicationCountAcksByOffset(c->woff); 1846 if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) { 1847 addReplyLongLong(c,ackreplicas); 1848 return; 1849 } 1850 1851 /* Otherwise block the client and put it into our list of clients 1852 * waiting for ack from slaves. */ 1853 c->bpop.timeout = timeout; 1854 c->bpop.reploffset = offset; 1855 c->bpop.numreplicas = numreplicas; 1856 listAddNodeTail(server.clients_waiting_acks,c); 1857 blockClient(c,REDIS_BLOCKED_WAIT); 1858 1859 /* Make sure that the server will send an ACK request to all the slaves 1860 * before returning to the event loop. */ 1861 replicationRequestAckFromSlaves(); 1862 } 1863 1864 /* This is called by unblockClient() to perform the blocking op type 1865 * specific cleanup. We just remove the client from the list of clients 1866 * waiting for replica acks. Never call it directly, call unblockClient() 1867 * instead. */ 1868 void unblockClientWaitingReplicas(redisClient *c) { 1869 listNode *ln = listSearchKey(server.clients_waiting_acks,c); 1870 redisAssert(ln != NULL); 1871 listDelNode(server.clients_waiting_acks,ln); 1872 } 1873 1874 /* Check if there are clients blocked in WAIT that can be unblocked since 1875 * we received enough ACKs from slaves. */ 1876 void processClientsWaitingReplicas(void) { 1877 long long last_offset = 0; 1878 int last_numreplicas = 0; 1879 1880 listIter li; 1881 listNode *ln; 1882 1883 listRewind(server.clients_waiting_acks,&li); 1884 while((ln = listNext(&li))) { 1885 redisClient *c = ln->value; 1886 1887 /* Every time we find a client that is satisfied for a given 1888 * offset and number of replicas, we remember it so the next client 1889 * may be unblocked without calling replicationCountAcksByOffset() 1890 * if the requested offset / replicas were equal or less. */ 1891 if (last_offset && last_offset > c->bpop.reploffset && 1892 last_numreplicas > c->bpop.numreplicas) 1893 { 1894 unblockClient(c); 1895 addReplyLongLong(c,last_numreplicas); 1896 } else { 1897 int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); 1898 1899 if (numreplicas >= c->bpop.numreplicas) { 1900 last_offset = c->bpop.reploffset; 1901 last_numreplicas = numreplicas; 1902 unblockClient(c); 1903 addReplyLongLong(c,numreplicas); 1904 } 1905 } 1906 } 1907 } 1908 1909 /* Return the slave replication offset for this instance, that is 1910 * the offset for which we already processed the master replication stream. */ 1911 long long replicationGetSlaveOffset(void) { 1912 long long offset = 0; 1913 1914 if (server.masterhost != NULL) { 1915 if (server.master) { 1916 offset = server.master->reploff; 1917 } else if (server.cached_master) { 1918 offset = server.cached_master->reploff; 1919 } 1920 } 1921 /* offset may be -1 when the master does not support it at all, however 1922 * this function is designed to return an offset that can express the 1923 * amount of data processed by the master, so we return a positive 1924 * integer. */ 1925 if (offset < 0) offset = 0; 1926 return offset; 1927 } 1928 1929 /* --------------------------- REPLICATION CRON ---------------------------- */ 1930 1931 /* Replication cron function, called 1 time per second. */ 1932 void replicationCron(void) { 1933 /* Non blocking connection timeout? */ 1934 if (server.masterhost && 1935 (server.repl_state == REDIS_REPL_CONNECTING || 1936 server.repl_state == REDIS_REPL_RECEIVE_PONG) && 1937 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1938 { 1939 redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); 1940 undoConnectWithMaster(); 1941 } 1942 1943 /* Bulk transfer I/O timeout? */ 1944 if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER && 1945 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) 1946 { 1947 redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); 1948 replicationAbortSyncTransfer(); 1949 } 1950 1951 /* Timed out master when we are an already connected slave? */ 1952 if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED && 1953 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) 1954 { 1955 redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received..."); 1956 freeClient(server.master); 1957 } 1958 1959 /* Check if we should connect to a MASTER */ 1960 if (server.repl_state == REDIS_REPL_CONNECT) { 1961 redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", 1962 server.masterhost, server.masterport); 1963 if (connectWithMaster() == REDIS_OK) { 1964 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); 1965 } 1966 } 1967 1968 /* Send ACK to master from time to time. 1969 * Note that we do not send periodic acks to masters that don't 1970 * support PSYNC and replication offsets. */ 1971 if (server.masterhost && server.master && 1972 !(server.master->flags & REDIS_PRE_PSYNC)) 1973 replicationSendAck(); 1974 1975 /* If we have attached slaves, PING them from time to time. 1976 * So slaves can implement an explicit timeout to masters, and will 1977 * be able to detect a link disconnection even if the TCP connection 1978 * will not actually go down. */ 1979 if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { 1980 listIter li; 1981 listNode *ln; 1982 robj *ping_argv[1]; 1983 1984 /* First, send PING */ 1985 ping_argv[0] = createStringObject("PING",4); 1986 replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); 1987 decrRefCount(ping_argv[0]); 1988 1989 /* Second, send a newline to all the slaves in pre-synchronization 1990 * stage, that is, slaves waiting for the master to create the RDB file. 1991 * The newline will be ignored by the slave but will refresh the 1992 * last-io timer preventing a timeout. */ 1993 listRewind(server.slaves,&li); 1994 while((ln = listNext(&li))) { 1995 redisClient *slave = ln->value; 1996 1997 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || 1998 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && 1999 server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) 2000 { 2001 if (write(slave->fd, "\n", 1) == -1) { 2002 /* Don't worry, it's just a ping. */ 2003 } 2004 } 2005 } 2006 } 2007 2008 /* Disconnect timedout slaves. */ 2009 if (listLength(server.slaves)) { 2010 listIter li; 2011 listNode *ln; 2012 2013 listRewind(server.slaves,&li); 2014 while((ln = listNext(&li))) { 2015 redisClient *slave = ln->value; 2016 2017 if (slave->replstate != REDIS_REPL_ONLINE) continue; 2018 if (slave->flags & REDIS_PRE_PSYNC) continue; 2019 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) 2020 { 2021 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", 2022 replicationGetSlaveName(slave)); 2023 freeClient(slave); 2024 } 2025 } 2026 } 2027 2028 /* If we have no attached slaves and there is a replication backlog 2029 * using memory, free it after some (configured) time. */ 2030 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && 2031 server.repl_backlog) 2032 { 2033 time_t idle = server.unixtime - server.repl_no_slaves_since; 2034 2035 if (idle > server.repl_backlog_time_limit) { 2036 freeReplicationBacklog(); 2037 redisLog(REDIS_NOTICE, 2038 "Replication backlog freed after %d seconds " 2039 "without connected slaves.", 2040 (int) server.repl_backlog_time_limit); 2041 } 2042 } 2043 2044 /* If AOF is disabled and we no longer have attached slaves, we can 2045 * free our Replication Script Cache as there is no need to propagate 2046 * EVALSHA at all. */ 2047 if (listLength(server.slaves) == 0 && 2048 server.aof_state == REDIS_AOF_OFF && 2049 listLength(server.repl_scriptcache_fifo) != 0) 2050 { 2051 replicationScriptCacheFlush(); 2052 } 2053 2054 /* If we are using diskless replication and there are slaves waiting 2055 * in WAIT_BGSAVE_START state, check if enough seconds elapsed and 2056 * start a BGSAVE. 2057 * 2058 * This code is also useful to trigger a BGSAVE if the diskless 2059 * replication was turned off with CONFIG SET, while there were already 2060 * slaves in WAIT_BGSAVE_START state. */ 2061 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 2062 time_t idle, max_idle = 0; 2063 int slaves_waiting = 0; 2064 listNode *ln; 2065 listIter li; 2066 2067 listRewind(server.slaves,&li); 2068 while((ln = listNext(&li))) { 2069 redisClient *slave = ln->value; 2070 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { 2071 idle = server.unixtime - slave->lastinteraction; 2072 if (idle > max_idle) max_idle = idle; 2073 slaves_waiting++; 2074 } 2075 } 2076 2077 if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { 2078 /* Start a BGSAVE. Usually with socket target, or with disk target 2079 * if there was a recent socket -> disk config change. */ 2080 if (startBgsaveForReplication() == REDIS_OK) { 2081 /* It started! We need to change the state of slaves 2082 * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case 2083 * the current target is disk. Otherwise it was already done 2084 * by rdbSaveToSlavesSockets() which is called by 2085 * startBgsaveForReplication(). */ 2086 listRewind(server.slaves,&li); 2087 while((ln = listNext(&li))) { 2088 redisClient *slave = ln->value; 2089 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) 2090 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; 2091 } 2092 } 2093 } 2094 } 2095 2096 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ 2097 refreshGoodSlavesCount(); 2098 } 2099