xref: /redis-3.2.3/src/replication.c (revision 8a09e129)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "redis.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 void putSlaveOnline(redisClient *slave);
44 
45 /* --------------------------- Utility functions ---------------------------- */
46 
47 /* Return the pointer to a string representing the slave ip:listening_port
48  * pair. Mostly useful for logging, since we want to log a slave using its
49  * IP address and it's listening port which is more clear for the user, for
50  * example: "Closing connection with slave 10.1.2.3:6380". */
51 char *replicationGetSlaveName(redisClient *c) {
52     static char buf[REDIS_PEER_ID_LEN];
53     char ip[REDIS_IP_STR_LEN];
54 
55     ip[0] = '\0';
56     buf[0] = '\0';
57     if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
58         if (c->slave_listening_port)
59             snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port);
60         else
61             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
62     } else {
63         snprintf(buf,sizeof(buf),"client id #%llu",
64             (unsigned long long) c->id);
65     }
66     return buf;
67 }
68 
69 /* ---------------------------------- MASTER -------------------------------- */
70 
71 void createReplicationBacklog(void) {
72     redisAssert(server.repl_backlog == NULL);
73     server.repl_backlog = zmalloc(server.repl_backlog_size);
74     server.repl_backlog_histlen = 0;
75     server.repl_backlog_idx = 0;
76     /* When a new backlog buffer is created, we increment the replication
77      * offset by one to make sure we'll not be able to PSYNC with any
78      * previous slave. This is needed because we avoid incrementing the
79      * master_repl_offset if no backlog exists nor slaves are attached. */
80     server.master_repl_offset++;
81 
82     /* We don't have any data inside our buffer, but virtually the first
83      * byte we have is the next byte that will be generated for the
84      * replication stream. */
85     server.repl_backlog_off = server.master_repl_offset+1;
86 }
87 
88 /* This function is called when the user modifies the replication backlog
89  * size at runtime. It is up to the function to both update the
90  * server.repl_backlog_size and to resize the buffer and setup it so that
91  * it contains the same data as the previous one (possibly less data, but
92  * the most recent bytes, or the same data and more free space in case the
93  * buffer is enlarged). */
94 void resizeReplicationBacklog(long long newsize) {
95     if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
96         newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
97     if (server.repl_backlog_size == newsize) return;
98 
99     server.repl_backlog_size = newsize;
100     if (server.repl_backlog != NULL) {
101         /* What we actually do is to flush the old buffer and realloc a new
102          * empty one. It will refill with new data incrementally.
103          * The reason is that copying a few gigabytes adds latency and even
104          * worse often we need to alloc additional space before freeing the
105          * old buffer. */
106         zfree(server.repl_backlog);
107         server.repl_backlog = zmalloc(server.repl_backlog_size);
108         server.repl_backlog_histlen = 0;
109         server.repl_backlog_idx = 0;
110         /* Next byte we have is... the next since the buffer is empty. */
111         server.repl_backlog_off = server.master_repl_offset+1;
112     }
113 }
114 
115 void freeReplicationBacklog(void) {
116     redisAssert(listLength(server.slaves) == 0);
117     zfree(server.repl_backlog);
118     server.repl_backlog = NULL;
119 }
120 
121 /* Add data to the replication backlog.
122  * This function also increments the global replication offset stored at
123  * server.master_repl_offset, because there is no case where we want to feed
124  * the backlog without incrementing the buffer. */
125 void feedReplicationBacklog(void *ptr, size_t len) {
126     unsigned char *p = ptr;
127 
128     server.master_repl_offset += len;
129 
130     /* This is a circular buffer, so write as much data we can at every
131      * iteration and rewind the "idx" index if we reach the limit. */
132     while(len) {
133         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
134         if (thislen > len) thislen = len;
135         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
136         server.repl_backlog_idx += thislen;
137         if (server.repl_backlog_idx == server.repl_backlog_size)
138             server.repl_backlog_idx = 0;
139         len -= thislen;
140         p += thislen;
141         server.repl_backlog_histlen += thislen;
142     }
143     if (server.repl_backlog_histlen > server.repl_backlog_size)
144         server.repl_backlog_histlen = server.repl_backlog_size;
145     /* Set the offset of the first byte we have in the backlog. */
146     server.repl_backlog_off = server.master_repl_offset -
147                               server.repl_backlog_histlen + 1;
148 }
149 
150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
151  * as input. */
152 void feedReplicationBacklogWithObject(robj *o) {
153     char llstr[REDIS_LONGSTR_SIZE];
154     void *p;
155     size_t len;
156 
157     if (o->encoding == REDIS_ENCODING_INT) {
158         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
159         p = llstr;
160     } else {
161         len = sdslen(o->ptr);
162         p = o->ptr;
163     }
164     feedReplicationBacklog(p,len);
165 }
166 
167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
168     listNode *ln;
169     listIter li;
170     int j, len;
171     char llstr[REDIS_LONGSTR_SIZE];
172 
173     /* If there aren't slaves, and there is no backlog buffer to populate,
174      * we can return ASAP. */
175     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
176 
177     /* We can't have slaves attached and no backlog. */
178     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
179 
180     /* Send SELECT command to every slave if needed. */
181     if (server.slaveseldb != dictid) {
182         robj *selectcmd;
183 
184         /* For a few DBs we have pre-computed SELECT command. */
185         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
186             selectcmd = shared.select[dictid];
187         } else {
188             int dictid_len;
189 
190             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
191             selectcmd = createObject(REDIS_STRING,
192                 sdscatprintf(sdsempty(),
193                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
194                 dictid_len, llstr));
195         }
196 
197         /* Add the SELECT command into the backlog. */
198         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
199 
200         /* Send it to slaves. */
201         listRewind(slaves,&li);
202         while((ln = listNext(&li))) {
203             redisClient *slave = ln->value;
204             addReply(slave,selectcmd);
205         }
206 
207         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
208             decrRefCount(selectcmd);
209     }
210     server.slaveseldb = dictid;
211 
212     /* Write the command to the replication backlog if any. */
213     if (server.repl_backlog) {
214         char aux[REDIS_LONGSTR_SIZE+3];
215 
216         /* Add the multi bulk reply length. */
217         aux[0] = '*';
218         len = ll2string(aux+1,sizeof(aux)-1,argc);
219         aux[len+1] = '\r';
220         aux[len+2] = '\n';
221         feedReplicationBacklog(aux,len+3);
222 
223         for (j = 0; j < argc; j++) {
224             long objlen = stringObjectLen(argv[j]);
225 
226             /* We need to feed the buffer with the object as a bulk reply
227              * not just as a plain string, so create the $..CRLF payload len
228              * and add the final CRLF */
229             aux[0] = '$';
230             len = ll2string(aux+1,sizeof(aux)-1,objlen);
231             aux[len+1] = '\r';
232             aux[len+2] = '\n';
233             feedReplicationBacklog(aux,len+3);
234             feedReplicationBacklogWithObject(argv[j]);
235             feedReplicationBacklog(aux+len+1,2);
236         }
237     }
238 
239     /* Write the command to every slave. */
240     listRewind(server.slaves,&li);
241     while((ln = listNext(&li))) {
242         redisClient *slave = ln->value;
243 
244         /* Don't feed slaves that are still waiting for BGSAVE to start */
245         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
246 
247         /* Feed slaves that are waiting for the initial SYNC (so these commands
248          * are queued in the output buffer until the initial SYNC completes),
249          * or are already in sync with the master. */
250 
251         /* Add the multi bulk length. */
252         addReplyMultiBulkLen(slave,argc);
253 
254         /* Finally any additional argument that was not stored inside the
255          * static buffer if any (from j to argc). */
256         for (j = 0; j < argc; j++)
257             addReplyBulk(slave,argv[j]);
258     }
259 }
260 
261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
262     listNode *ln;
263     listIter li;
264     int j;
265     sds cmdrepr = sdsnew("+");
266     robj *cmdobj;
267     struct timeval tv;
268 
269     gettimeofday(&tv,NULL);
270     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
271     if (c->flags & REDIS_LUA_CLIENT) {
272         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
273     } else if (c->flags & REDIS_UNIX_SOCKET) {
274         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
275     } else {
276         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
277     }
278 
279     for (j = 0; j < argc; j++) {
280         if (argv[j]->encoding == REDIS_ENCODING_INT) {
281             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
282         } else {
283             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
284                         sdslen(argv[j]->ptr));
285         }
286         if (j != argc-1)
287             cmdrepr = sdscatlen(cmdrepr," ",1);
288     }
289     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
290     cmdobj = createObject(REDIS_STRING,cmdrepr);
291 
292     listRewind(monitors,&li);
293     while((ln = listNext(&li))) {
294         redisClient *monitor = ln->value;
295         addReply(monitor,cmdobj);
296     }
297     decrRefCount(cmdobj);
298 }
299 
300 /* Feed the slave 'c' with the replication backlog starting from the
301  * specified 'offset' up to the end of the backlog. */
302 long long addReplyReplicationBacklog(redisClient *c, long long offset) {
303     long long j, skip, len;
304 
305     redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
306 
307     if (server.repl_backlog_histlen == 0) {
308         redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
309         return 0;
310     }
311 
312     redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
313              server.repl_backlog_size);
314     redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
315              server.repl_backlog_off);
316     redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
317              server.repl_backlog_histlen);
318     redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
319              server.repl_backlog_idx);
320 
321     /* Compute the amount of bytes we need to discard. */
322     skip = offset - server.repl_backlog_off;
323     redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
324 
325     /* Point j to the oldest byte, that is actaully our
326      * server.repl_backlog_off byte. */
327     j = (server.repl_backlog_idx +
328         (server.repl_backlog_size-server.repl_backlog_histlen)) %
329         server.repl_backlog_size;
330     redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
331 
332     /* Discard the amount of data to seek to the specified 'offset'. */
333     j = (j + skip) % server.repl_backlog_size;
334 
335     /* Feed slave with data. Since it is a circular buffer we have to
336      * split the reply in two parts if we are cross-boundary. */
337     len = server.repl_backlog_histlen - skip;
338     redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
339     while(len) {
340         long long thislen =
341             ((server.repl_backlog_size - j) < len) ?
342             (server.repl_backlog_size - j) : len;
343 
344         redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
345         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
346         len -= thislen;
347         j = 0;
348     }
349     return server.repl_backlog_histlen - skip;
350 }
351 
352 /* This function handles the PSYNC command from the point of view of a
353  * master receiving a request for partial resynchronization.
354  *
355  * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
356  * with the usual full resync. */
357 int masterTryPartialResynchronization(redisClient *c) {
358     long long psync_offset, psync_len;
359     char *master_runid = c->argv[1]->ptr;
360     char buf[128];
361     int buflen;
362 
363     /* Is the runid of this master the same advertised by the wannabe slave
364      * via PSYNC? If runid changed this master is a different instance and
365      * there is no way to continue. */
366     if (strcasecmp(master_runid, server.runid)) {
367         /* Run id "?" is used by slaves that want to force a full resync. */
368         if (master_runid[0] != '?') {
369             redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
370                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
371                 master_runid, server.runid);
372         } else {
373             redisLog(REDIS_NOTICE,"Full resync requested by slave %s",
374                 replicationGetSlaveName(c));
375         }
376         goto need_full_resync;
377     }
378 
379     /* We still have the data our slave is asking for? */
380     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
381        REDIS_OK) goto need_full_resync;
382     if (!server.repl_backlog ||
383         psync_offset < server.repl_backlog_off ||
384         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
385     {
386         redisLog(REDIS_NOTICE,
387             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
388         if (psync_offset > server.master_repl_offset) {
389             redisLog(REDIS_WARNING,
390                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
391         }
392         goto need_full_resync;
393     }
394 
395     /* If we reached this point, we are able to perform a partial resync:
396      * 1) Set client state to make it a slave.
397      * 2) Inform the client we can continue with +CONTINUE
398      * 3) Send the backlog data (from the offset to the end) to the slave. */
399     c->flags |= REDIS_SLAVE;
400     c->replstate = REDIS_REPL_ONLINE;
401     c->repl_ack_time = server.unixtime;
402     c->repl_put_online_on_ack = 0;
403     listAddNodeTail(server.slaves,c);
404     /* We can't use the connection buffers since they are used to accumulate
405      * new commands at this stage. But we are sure the socket send buffer is
406      * empty so this write will never fail actually. */
407     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
408     if (write(c->fd,buf,buflen) != buflen) {
409         freeClientAsync(c);
410         return REDIS_OK;
411     }
412     psync_len = addReplyReplicationBacklog(c,psync_offset);
413     redisLog(REDIS_NOTICE,
414         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
415             replicationGetSlaveName(c),
416             psync_len, psync_offset);
417     /* Note that we don't need to set the selected DB at server.slaveseldb
418      * to -1 to force the master to emit SELECT, since the slave already
419      * has this state from the previous connection with the master. */
420 
421     refreshGoodSlavesCount();
422     return REDIS_OK; /* The caller can return, no full resync needed. */
423 
424 need_full_resync:
425     /* We need a full resync for some reason... notify the client. */
426     psync_offset = server.master_repl_offset;
427     /* Add 1 to psync_offset if it the replication backlog does not exists
428      * as when it will be created later we'll increment the offset by one. */
429     if (server.repl_backlog == NULL) psync_offset++;
430     /* Again, we can't use the connection buffers (see above). */
431     buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
432                       server.runid,psync_offset);
433     if (write(c->fd,buf,buflen) != buflen) {
434         freeClientAsync(c);
435         return REDIS_OK;
436     }
437     return REDIS_ERR;
438 }
439 
440 /* Start a BGSAVE for replication goals, which is, selecting the disk or
441  * socket target depending on the configuration, and making sure that
442  * the script cache is flushed before to start.
443  *
444  * Returns REDIS_OK on success or REDIS_ERR otherwise. */
445 int startBgsaveForReplication(void) {
446     int retval;
447 
448     redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
449         server.repl_diskless_sync ? "slaves sockets" : "disk");
450 
451     if (server.repl_diskless_sync)
452         retval = rdbSaveToSlavesSockets();
453     else
454         retval = rdbSaveBackground(server.rdb_filename);
455 
456     /* Flush the script cache, since we need that slave differences are
457      * accumulated without requiring slaves to match our cached scripts. */
458     if (retval == REDIS_OK) replicationScriptCacheFlush();
459     return retval;
460 }
461 
462 /* SYNC and PSYNC command implemenation. */
463 void syncCommand(redisClient *c) {
464     /* ignore SYNC if already slave or in monitor mode */
465     if (c->flags & REDIS_SLAVE) return;
466 
467     /* Refuse SYNC requests if we are a slave but the link with our master
468      * is not ok... */
469     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
470         addReplyError(c,"Can't SYNC while not connected with my master");
471         return;
472     }
473 
474     /* SYNC can't be issued when the server has pending data to send to
475      * the client about already issued commands. We need a fresh reply
476      * buffer registering the differences between the BGSAVE and the current
477      * dataset, so that we can copy to other slaves if needed. */
478     if (listLength(c->reply) != 0 || c->bufpos != 0) {
479         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
480         return;
481     }
482 
483     redisLog(REDIS_NOTICE,"Slave %s asks for synchronization",
484         replicationGetSlaveName(c));
485 
486     /* Try a partial resynchronization if this is a PSYNC command.
487      * If it fails, we continue with usual full resynchronization, however
488      * when this happens masterTryPartialResynchronization() already
489      * replied with:
490      *
491      * +FULLRESYNC <runid> <offset>
492      *
493      * So the slave knows the new runid and offset to try a PSYNC later
494      * if the connection with the master is lost. */
495     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
496         if (masterTryPartialResynchronization(c) == REDIS_OK) {
497             server.stat_sync_partial_ok++;
498             return; /* No full resync needed, return. */
499         } else {
500             char *master_runid = c->argv[1]->ptr;
501 
502             /* Increment stats for failed PSYNCs, but only if the
503              * runid is not "?", as this is used by slaves to force a full
504              * resync on purpose when they are not albe to partially
505              * resync. */
506             if (master_runid[0] != '?') server.stat_sync_partial_err++;
507         }
508     } else {
509         /* If a slave uses SYNC, we are dealing with an old implementation
510          * of the replication protocol (like redis-cli --slave). Flag the client
511          * so that we don't expect to receive REPLCONF ACK feedbacks. */
512         c->flags |= REDIS_PRE_PSYNC;
513     }
514 
515     /* Full resynchronization. */
516     server.stat_sync_full++;
517 
518     /* Here we need to check if there is a background saving operation
519      * in progress, or if it is required to start one */
520     if (server.rdb_child_pid != -1 &&
521         server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
522     {
523         /* Ok a background save is in progress. Let's check if it is a good
524          * one for replication, i.e. if there is another slave that is
525          * registering differences since the server forked to save. */
526         redisClient *slave;
527         listNode *ln;
528         listIter li;
529 
530         listRewind(server.slaves,&li);
531         while((ln = listNext(&li))) {
532             slave = ln->value;
533             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
534         }
535         if (ln) {
536             /* Perfect, the server is already registering differences for
537              * another slave. Set the right state, and copy the buffer. */
538             copyClientOutputBuffer(c,slave);
539             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
540             redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
541         } else {
542             /* No way, we need to wait for the next BGSAVE in order to
543              * register differences. */
544             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
545             redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
546         }
547     } else if (server.rdb_child_pid != -1 &&
548                server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
549     {
550         /* There is an RDB child process but it is writing directly to
551          * children sockets. We need to wait for the next BGSAVE
552          * in order to synchronize. */
553         c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
554         redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
555     } else {
556         if (server.repl_diskless_sync) {
557             /* Diskless replication RDB child is created inside
558              * replicationCron() since we want to delay its start a
559              * few seconds to wait for more slaves to arrive. */
560             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
561             if (server.repl_diskless_sync_delay)
562                 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
563         } else {
564             /* Ok we don't have a BGSAVE in progress, let's start one. */
565             if (startBgsaveForReplication() != REDIS_OK) {
566                 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
567                 addReplyError(c,"Unable to perform background save");
568                 return;
569             }
570             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
571         }
572     }
573 
574     if (server.repl_disable_tcp_nodelay)
575         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
576     c->repldbfd = -1;
577     c->flags |= REDIS_SLAVE;
578     server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
579     listAddNodeTail(server.slaves,c);
580     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
581         createReplicationBacklog();
582     return;
583 }
584 
585 /* REPLCONF <option> <value> <option> <value> ...
586  * This command is used by a slave in order to configure the replication
587  * process before starting it with the SYNC command.
588  *
589  * Currently the only use of this command is to communicate to the master
590  * what is the listening port of the Slave redis instance, so that the
591  * master can accurately list slaves and their listening ports in
592  * the INFO output.
593  *
594  * In the future the same command can be used in order to configure
595  * the replication to initiate an incremental replication instead of a
596  * full resync. */
597 void replconfCommand(redisClient *c) {
598     int j;
599 
600     if ((c->argc % 2) == 0) {
601         /* Number of arguments must be odd to make sure that every
602          * option has a corresponding value. */
603         addReply(c,shared.syntaxerr);
604         return;
605     }
606 
607     /* Process every option-value pair. */
608     for (j = 1; j < c->argc; j+=2) {
609         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
610             long port;
611 
612             if ((getLongFromObjectOrReply(c,c->argv[j+1],
613                     &port,NULL) != REDIS_OK))
614                 return;
615             c->slave_listening_port = port;
616         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
617             /* REPLCONF ACK is used by slave to inform the master the amount
618              * of replication stream that it processed so far. It is an
619              * internal only command that normal clients should never use. */
620             long long offset;
621 
622             if (!(c->flags & REDIS_SLAVE)) return;
623             if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
624                 return;
625             if (offset > c->repl_ack_off)
626                 c->repl_ack_off = offset;
627             c->repl_ack_time = server.unixtime;
628             /* If this was a diskless replication, we need to really put
629              * the slave online when the first ACK is received (which
630              * confirms slave is online and ready to get more data). */
631             if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
632                 putSlaveOnline(c);
633             /* Note: this command does not reply anything! */
634             return;
635         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
636             /* REPLCONF GETACK is used in order to request an ACK ASAP
637              * to the slave. */
638             if (server.masterhost && server.master) replicationSendAck();
639             /* Note: this command does not reply anything! */
640         } else {
641             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
642                 (char*)c->argv[j]->ptr);
643             return;
644         }
645     }
646     addReply(c,shared.ok);
647 }
648 
649 /* This function puts a slave in the online state, and should be called just
650  * after a slave received the RDB file for the initial synchronization, and
651  * we are finally ready to send the incremental stream of commands.
652  *
653  * It does a few things:
654  *
655  * 1) Put the slave in ONLINE state.
656  * 2) Make sure the writable event is re-installed, since calling the SYNC
657  *    command disables it, so that we can accumulate output buffer without
658  *    sending it to the slave.
659  * 3) Update the count of good slaves. */
660 void putSlaveOnline(redisClient *slave) {
661     slave->replstate = REDIS_REPL_ONLINE;
662     slave->repl_put_online_on_ack = 0;
663     slave->repl_ack_time = server.unixtime;
664     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
665         sendReplyToClient, slave) == AE_ERR) {
666         redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
667         freeClient(slave);
668         return;
669     }
670     refreshGoodSlavesCount();
671     redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
672         replicationGetSlaveName(slave));
673 }
674 
675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
676     redisClient *slave = privdata;
677     REDIS_NOTUSED(el);
678     REDIS_NOTUSED(mask);
679     char buf[REDIS_IOBUF_LEN];
680     ssize_t nwritten, buflen;
681 
682     /* Before sending the RDB file, we send the preamble as configured by the
683      * replication process. Currently the preamble is just the bulk count of
684      * the file in the form "$<length>\r\n". */
685     if (slave->replpreamble) {
686         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
687         if (nwritten == -1) {
688             redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
689                 strerror(errno));
690             freeClient(slave);
691             return;
692         }
693         sdsrange(slave->replpreamble,nwritten,-1);
694         if (sdslen(slave->replpreamble) == 0) {
695             sdsfree(slave->replpreamble);
696             slave->replpreamble = NULL;
697             /* fall through sending data. */
698         } else {
699             return;
700         }
701     }
702 
703     /* If the preamble was already transfered, send the RDB bulk data. */
704     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
705     buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
706     if (buflen <= 0) {
707         redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
708             (buflen == 0) ? "premature EOF" : strerror(errno));
709         freeClient(slave);
710         return;
711     }
712     if ((nwritten = write(fd,buf,buflen)) == -1) {
713         if (errno != EAGAIN) {
714             redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
715                 strerror(errno));
716             freeClient(slave);
717         }
718         return;
719     }
720     slave->repldboff += nwritten;
721     if (slave->repldboff == slave->repldbsize) {
722         close(slave->repldbfd);
723         slave->repldbfd = -1;
724         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
725         putSlaveOnline(slave);
726     }
727 }
728 
729 /* This function is called at the end of every background saving,
730  * or when the replication RDB transfer strategy is modified from
731  * disk to socket or the other way around.
732  *
733  * The goal of this function is to handle slaves waiting for a successful
734  * background saving in order to perform non-blocking synchronization, and
735  * to schedule a new BGSAVE if there are slaves that attached while a
736  * BGSAVE was in progress, but it was not a good one for replication (no
737  * other slave was accumulating differences).
738  *
739  * The argument bgsaveerr is REDIS_OK if the background saving succeeded
740  * otherwise REDIS_ERR is passed to the function.
741  * The 'type' argument is the type of the child that terminated
742  * (if it had a disk or socket target). */
743 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
744     listNode *ln;
745     int startbgsave = 0;
746     listIter li;
747 
748     listRewind(server.slaves,&li);
749     while((ln = listNext(&li))) {
750         redisClient *slave = ln->value;
751 
752         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
753             startbgsave = 1;
754             slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
755         } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
756             struct redis_stat buf;
757 
758             /* If this was an RDB on disk save, we have to prepare to send
759              * the RDB from disk to the slave socket. Otherwise if this was
760              * already an RDB -> Slaves socket transfer, used in the case of
761              * diskless replication, our work is trivial, we can just put
762              * the slave online. */
763             if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
764                 redisLog(REDIS_NOTICE,
765                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
766                         replicationGetSlaveName(slave));
767                 /* Note: we wait for a REPLCONF ACK message from slave in
768                  * order to really put it online (install the write handler
769                  * so that the accumulated data can be transfered). However
770                  * we change the replication state ASAP, since our slave
771                  * is technically online now. */
772                 slave->replstate = REDIS_REPL_ONLINE;
773                 slave->repl_put_online_on_ack = 1;
774             } else {
775                 if (bgsaveerr != REDIS_OK) {
776                     freeClient(slave);
777                     redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
778                     continue;
779                 }
780                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
781                     redis_fstat(slave->repldbfd,&buf) == -1) {
782                     freeClient(slave);
783                     redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
784                     continue;
785                 }
786                 slave->repldboff = 0;
787                 slave->repldbsize = buf.st_size;
788                 slave->replstate = REDIS_REPL_SEND_BULK;
789                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
790                     (unsigned long long) slave->repldbsize);
791 
792                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
793                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
794                     freeClient(slave);
795                     continue;
796                 }
797             }
798         }
799     }
800     if (startbgsave) {
801         if (startBgsaveForReplication() != REDIS_OK) {
802             listIter li;
803 
804             listRewind(server.slaves,&li);
805             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
806             while((ln = listNext(&li))) {
807                 redisClient *slave = ln->value;
808 
809                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
810                     freeClient(slave);
811             }
812         }
813     }
814 }
815 
816 /* ----------------------------------- SLAVE -------------------------------- */
817 
818 /* Abort the async download of the bulk dataset while SYNC-ing with master */
819 void replicationAbortSyncTransfer(void) {
820     redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
821 
822     aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
823     close(server.repl_transfer_s);
824     close(server.repl_transfer_fd);
825     unlink(server.repl_transfer_tmpfile);
826     zfree(server.repl_transfer_tmpfile);
827     server.repl_state = REDIS_REPL_CONNECT;
828 }
829 
830 /* Avoid the master to detect the slave is timing out while loading the
831  * RDB file in initial synchronization. We send a single newline character
832  * that is valid protocol but is guaranteed to either be sent entierly or
833  * not, since the byte is indivisible.
834  *
835  * The function is called in two contexts: while we flush the current
836  * data with emptyDb(), and while we load the new data received as an
837  * RDB file from the master. */
838 void replicationSendNewlineToMaster(void) {
839     static time_t newline_sent;
840     if (time(NULL) != newline_sent) {
841         newline_sent = time(NULL);
842         if (write(server.repl_transfer_s,"\n",1) == -1) {
843             /* Pinging back in this stage is best-effort. */
844         }
845     }
846 }
847 
848 /* Callback used by emptyDb() while flushing away old data to load
849  * the new dataset received by the master. */
850 void replicationEmptyDbCallback(void *privdata) {
851     REDIS_NOTUSED(privdata);
852     replicationSendNewlineToMaster();
853 }
854 
855 /* Asynchronously read the SYNC payload we receive from a master */
856 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
857 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
858     char buf[4096];
859     ssize_t nread, readlen;
860     off_t left;
861     REDIS_NOTUSED(el);
862     REDIS_NOTUSED(privdata);
863     REDIS_NOTUSED(mask);
864 
865     /* Static vars used to hold the EOF mark, and the last bytes received
866      * form the server: when they match, we reached the end of the transfer. */
867     static char eofmark[REDIS_RUN_ID_SIZE];
868     static char lastbytes[REDIS_RUN_ID_SIZE];
869     static int usemark = 0;
870 
871     /* If repl_transfer_size == -1 we still have to read the bulk length
872      * from the master reply. */
873     if (server.repl_transfer_size == -1) {
874         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
875             redisLog(REDIS_WARNING,
876                 "I/O error reading bulk count from MASTER: %s",
877                 strerror(errno));
878             goto error;
879         }
880 
881         if (buf[0] == '-') {
882             redisLog(REDIS_WARNING,
883                 "MASTER aborted replication with an error: %s",
884                 buf+1);
885             goto error;
886         } else if (buf[0] == '\0') {
887             /* At this stage just a newline works as a PING in order to take
888              * the connection live. So we refresh our last interaction
889              * timestamp. */
890             server.repl_transfer_lastio = server.unixtime;
891             return;
892         } else if (buf[0] != '$') {
893             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
894             goto error;
895         }
896 
897         /* There are two possible forms for the bulk payload. One is the
898          * usual $<count> bulk format. The other is used for diskless transfers
899          * when the master does not know beforehand the size of the file to
900          * transfer. In the latter case, the following format is used:
901          *
902          * $EOF:<40 bytes delimiter>
903          *
904          * At the end of the file the announced delimiter is transmitted. The
905          * delimiter is long and random enough that the probability of a
906          * collision with the actual file content can be ignored. */
907         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
908             usemark = 1;
909             memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
910             memset(lastbytes,0,REDIS_RUN_ID_SIZE);
911             /* Set any repl_transfer_size to avoid entering this code path
912              * at the next call. */
913             server.repl_transfer_size = 0;
914             redisLog(REDIS_NOTICE,
915                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
916         } else {
917             usemark = 0;
918             server.repl_transfer_size = strtol(buf+1,NULL,10);
919             redisLog(REDIS_NOTICE,
920                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
921                 (long long) server.repl_transfer_size);
922         }
923         return;
924     }
925 
926     /* Read bulk data */
927     if (usemark) {
928         readlen = sizeof(buf);
929     } else {
930         left = server.repl_transfer_size - server.repl_transfer_read;
931         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
932     }
933 
934     nread = read(fd,buf,readlen);
935     if (nread <= 0) {
936         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
937             (nread == -1) ? strerror(errno) : "connection lost");
938         replicationAbortSyncTransfer();
939         return;
940     }
941 
942     /* When a mark is used, we want to detect EOF asap in order to avoid
943      * writing the EOF mark into the file... */
944     int eof_reached = 0;
945 
946     if (usemark) {
947         /* Update the last bytes array, and check if it matches our delimiter.*/
948         if (nread >= REDIS_RUN_ID_SIZE) {
949             memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
950         } else {
951             int rem = REDIS_RUN_ID_SIZE-nread;
952             memmove(lastbytes,lastbytes+nread,rem);
953             memcpy(lastbytes+rem,buf,nread);
954         }
955         if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
956     }
957 
958     server.repl_transfer_lastio = server.unixtime;
959     if (write(server.repl_transfer_fd,buf,nread) != nread) {
960         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
961         goto error;
962     }
963     server.repl_transfer_read += nread;
964 
965     /* Delete the last 40 bytes from the file if we reached EOF. */
966     if (usemark && eof_reached) {
967         if (ftruncate(server.repl_transfer_fd,
968             server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)
969         {
970             redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
971             goto error;
972         }
973     }
974 
975     /* Sync data on disk from time to time, otherwise at the end of the transfer
976      * we may suffer a big delay as the memory buffers are copied into the
977      * actual disk. */
978     if (server.repl_transfer_read >=
979         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
980     {
981         off_t sync_size = server.repl_transfer_read -
982                           server.repl_transfer_last_fsync_off;
983         rdb_fsync_range(server.repl_transfer_fd,
984             server.repl_transfer_last_fsync_off, sync_size);
985         server.repl_transfer_last_fsync_off += sync_size;
986     }
987 
988     /* Check if the transfer is now complete */
989     if (!usemark) {
990         if (server.repl_transfer_read == server.repl_transfer_size)
991             eof_reached = 1;
992     }
993 
994     if (eof_reached) {
995         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
996             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
997             replicationAbortSyncTransfer();
998             return;
999         }
1000         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1001         signalFlushedDb(-1);
1002         emptyDb(replicationEmptyDbCallback);
1003         /* Before loading the DB into memory we need to delete the readable
1004          * handler, otherwise it will get called recursively since
1005          * rdbLoad() will call the event loop to process events from time to
1006          * time for non blocking loading. */
1007         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
1008         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
1009         if (rdbLoad(server.rdb_filename) != REDIS_OK) {
1010             redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1011             replicationAbortSyncTransfer();
1012             return;
1013         }
1014         /* Final setup of the connected slave <- master link */
1015         zfree(server.repl_transfer_tmpfile);
1016         close(server.repl_transfer_fd);
1017         server.master = createClient(server.repl_transfer_s);
1018         server.master->flags |= REDIS_MASTER;
1019         server.master->authenticated = 1;
1020         server.repl_state = REDIS_REPL_CONNECTED;
1021         server.master->reploff = server.repl_master_initial_offset;
1022         memcpy(server.master->replrunid, server.repl_master_runid,
1023             sizeof(server.repl_master_runid));
1024         /* If master offset is set to -1, this master is old and is not
1025          * PSYNC capable, so we flag it accordingly. */
1026         if (server.master->reploff == -1)
1027             server.master->flags |= REDIS_PRE_PSYNC;
1028         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1029         /* Restart the AOF subsystem now that we finished the sync. This
1030          * will trigger an AOF rewrite, and when done will start appending
1031          * to the new file. */
1032         if (server.aof_state != REDIS_AOF_OFF) {
1033             int retry = 10;
1034 
1035             stopAppendOnly();
1036             while (retry-- && startAppendOnly() == REDIS_ERR) {
1037                 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1038                 sleep(1);
1039             }
1040             if (!retry) {
1041                 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1042                 exit(1);
1043             }
1044         }
1045     }
1046 
1047     return;
1048 
1049 error:
1050     replicationAbortSyncTransfer();
1051     return;
1052 }
1053 
1054 /* Send a synchronous command to the master. Used to send AUTH and
1055  * REPLCONF commands before starting the replication with SYNC.
1056  *
1057  * The command returns an sds string representing the result of the
1058  * operation. On error the first byte is a "-".
1059  */
1060 char *sendSynchronousCommand(int fd, ...) {
1061     va_list ap;
1062     sds cmd = sdsempty();
1063     char *arg, buf[256];
1064 
1065     /* Create the command to send to the master, we use simple inline
1066      * protocol for simplicity as currently we only send simple strings. */
1067     va_start(ap,fd);
1068     while(1) {
1069         arg = va_arg(ap, char*);
1070         if (arg == NULL) break;
1071 
1072         if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
1073         cmd = sdscat(cmd,arg);
1074     }
1075     cmd = sdscatlen(cmd,"\r\n",2);
1076 
1077     /* Transfer command to the server. */
1078     if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
1079         sdsfree(cmd);
1080         return sdscatprintf(sdsempty(),"-Writing to master: %s",
1081                 strerror(errno));
1082     }
1083     sdsfree(cmd);
1084 
1085     /* Read the reply from the server. */
1086     if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
1087     {
1088         return sdscatprintf(sdsempty(),"-Reading from master: %s",
1089                 strerror(errno));
1090     }
1091     return sdsnew(buf);
1092 }
1093 
1094 /* Try a partial resynchronization with the master if we are about to reconnect.
1095  * If there is no cached master structure, at least try to issue a
1096  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1097  * command in order to obtain the master run id and the master replication
1098  * global offset.
1099  *
1100  * This function is designed to be called from syncWithMaster(), so the
1101  * following assumptions are made:
1102  *
1103  * 1) We pass the function an already connected socket "fd".
1104  * 2) This function does not close the file descriptor "fd". However in case
1105  *    of successful partial resynchronization, the function will reuse
1106  *    'fd' as file descriptor of the server.master client structure.
1107  *
1108  * The function returns:
1109  *
1110  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
1111  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1112  *                   In this case the master run_id and global replication
1113  *                   offset is saved.
1114  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1115  *                      the caller should fall back to SYNC.
1116  */
1117 
1118 #define PSYNC_CONTINUE 0
1119 #define PSYNC_FULLRESYNC 1
1120 #define PSYNC_NOT_SUPPORTED 2
1121 int slaveTryPartialResynchronization(int fd) {
1122     char *psync_runid;
1123     char psync_offset[32];
1124     sds reply;
1125 
1126     /* Initially set repl_master_initial_offset to -1 to mark the current
1127      * master run_id and offset as not valid. Later if we'll be able to do
1128      * a FULL resync using the PSYNC command we'll set the offset at the
1129      * right value, so that this information will be propagated to the
1130      * client structure representing the master into server.master. */
1131     server.repl_master_initial_offset = -1;
1132 
1133     if (server.cached_master) {
1134         psync_runid = server.cached_master->replrunid;
1135         snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1136         redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
1137     } else {
1138         redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
1139         psync_runid = "?";
1140         memcpy(psync_offset,"-1",3);
1141     }
1142 
1143     /* Issue the PSYNC command */
1144     reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
1145 
1146     if (!strncmp(reply,"+FULLRESYNC",11)) {
1147         char *runid = NULL, *offset = NULL;
1148 
1149         /* FULL RESYNC, parse the reply in order to extract the run id
1150          * and the replication offset. */
1151         runid = strchr(reply,' ');
1152         if (runid) {
1153             runid++;
1154             offset = strchr(runid,' ');
1155             if (offset) offset++;
1156         }
1157         if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
1158             redisLog(REDIS_WARNING,
1159                 "Master replied with wrong +FULLRESYNC syntax.");
1160             /* This is an unexpected condition, actually the +FULLRESYNC
1161              * reply means that the master supports PSYNC, but the reply
1162              * format seems wrong. To stay safe we blank the master
1163              * runid to make sure next PSYNCs will fail. */
1164             memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
1165         } else {
1166             memcpy(server.repl_master_runid, runid, offset-runid-1);
1167             server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
1168             server.repl_master_initial_offset = strtoll(offset,NULL,10);
1169             redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
1170                 server.repl_master_runid,
1171                 server.repl_master_initial_offset);
1172         }
1173         /* We are going to full resync, discard the cached master structure. */
1174         replicationDiscardCachedMaster();
1175         sdsfree(reply);
1176         return PSYNC_FULLRESYNC;
1177     }
1178 
1179     if (!strncmp(reply,"+CONTINUE",9)) {
1180         /* Partial resync was accepted, set the replication state accordingly */
1181         redisLog(REDIS_NOTICE,
1182             "Successful partial resynchronization with master.");
1183         sdsfree(reply);
1184         replicationResurrectCachedMaster(fd);
1185         return PSYNC_CONTINUE;
1186     }
1187 
1188     /* If we reach this point we receied either an error since the master does
1189      * not understand PSYNC, or an unexpected reply from the master.
1190      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1191 
1192     if (strncmp(reply,"-ERR",4)) {
1193         /* If it's not an error, log the unexpected event. */
1194         redisLog(REDIS_WARNING,
1195             "Unexpected reply to PSYNC from master: %s", reply);
1196     } else {
1197         redisLog(REDIS_NOTICE,
1198             "Master does not support PSYNC or is in "
1199             "error state (reply: %s)", reply);
1200     }
1201     sdsfree(reply);
1202     replicationDiscardCachedMaster();
1203     return PSYNC_NOT_SUPPORTED;
1204 }
1205 
1206 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1207     char tmpfile[256], *err;
1208     int dfd, maxtries = 5;
1209     int sockerr = 0, psync_result;
1210     socklen_t errlen = sizeof(sockerr);
1211     REDIS_NOTUSED(el);
1212     REDIS_NOTUSED(privdata);
1213     REDIS_NOTUSED(mask);
1214 
1215     /* If this event fired after the user turned the instance into a master
1216      * with SLAVEOF NO ONE we must just return ASAP. */
1217     if (server.repl_state == REDIS_REPL_NONE) {
1218         close(fd);
1219         return;
1220     }
1221 
1222     /* Check for errors in the socket. */
1223     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1224         sockerr = errno;
1225     if (sockerr) {
1226         aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1227         redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
1228             strerror(sockerr));
1229         goto error;
1230     }
1231 
1232     /* If we were connecting, it's time to send a non blocking PING, we want to
1233      * make sure the master is able to reply before going into the actual
1234      * replication process where we have long timeouts in the order of
1235      * seconds (in the meantime the slave would block). */
1236     if (server.repl_state == REDIS_REPL_CONNECTING) {
1237         redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
1238         /* Delete the writable event so that the readable event remains
1239          * registered and we can wait for the PONG reply. */
1240         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1241         server.repl_state = REDIS_REPL_RECEIVE_PONG;
1242         /* Send the PING, don't check for errors at all, we have the timeout
1243          * that will take care about this. */
1244         syncWrite(fd,"PING\r\n",6,100);
1245         return;
1246     }
1247 
1248     /* Receive the PONG command. */
1249     if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
1250         char buf[1024];
1251 
1252         /* Delete the readable event, we no longer need it now that there is
1253          * the PING reply to read. */
1254         aeDeleteFileEvent(server.el,fd,AE_READABLE);
1255 
1256         /* Read the reply with explicit timeout. */
1257         buf[0] = '\0';
1258         if (syncReadLine(fd,buf,sizeof(buf),
1259             server.repl_syncio_timeout*1000) == -1)
1260         {
1261             redisLog(REDIS_WARNING,
1262                 "I/O error reading PING reply from master: %s",
1263                 strerror(errno));
1264             goto error;
1265         }
1266 
1267         /* We accept only two replies as valid, a positive +PONG reply
1268          * (we just check for "+") or an authentication error.
1269          * Note that older versions of Redis replied with "operation not
1270          * permitted" instead of using a proper error code, so we test
1271          * both. */
1272         if (buf[0] != '+' &&
1273             strncmp(buf,"-NOAUTH",7) != 0 &&
1274             strncmp(buf,"-ERR operation not permitted",28) != 0)
1275         {
1276             redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
1277             goto error;
1278         } else {
1279             redisLog(REDIS_NOTICE,
1280                 "Master replied to PING, replication can continue...");
1281         }
1282     }
1283 
1284     /* AUTH with the master if required. */
1285     if(server.masterauth) {
1286         err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
1287         if (err[0] == '-') {
1288             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
1289             sdsfree(err);
1290             goto error;
1291         }
1292         sdsfree(err);
1293     }
1294 
1295     /* Set the slave port, so that Master's INFO command can list the
1296      * slave listening port correctly. */
1297     {
1298         sds port = sdsfromlonglong(server.port);
1299         err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
1300                                          NULL);
1301         sdsfree(port);
1302         /* Ignore the error if any, not all the Redis versions support
1303          * REPLCONF listening-port. */
1304         if (err[0] == '-') {
1305             redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
1306         }
1307         sdsfree(err);
1308     }
1309 
1310     /* Try a partial resynchonization. If we don't have a cached master
1311      * slaveTryPartialResynchronization() will at least try to use PSYNC
1312      * to start a full resynchronization so that we get the master run id
1313      * and the global offset, to try a partial resync at the next
1314      * reconnection attempt. */
1315     psync_result = slaveTryPartialResynchronization(fd);
1316     if (psync_result == PSYNC_CONTINUE) {
1317         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1318         return;
1319     }
1320 
1321     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1322      * and the server.repl_master_runid and repl_master_initial_offset are
1323      * already populated. */
1324     if (psync_result == PSYNC_NOT_SUPPORTED) {
1325         redisLog(REDIS_NOTICE,"Retrying with SYNC...");
1326         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1327             redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
1328                 strerror(errno));
1329             goto error;
1330         }
1331     }
1332 
1333     /* Prepare a suitable temp file for bulk transfer */
1334     while(maxtries--) {
1335         snprintf(tmpfile,256,
1336             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1337         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1338         if (dfd != -1) break;
1339         sleep(1);
1340     }
1341     if (dfd == -1) {
1342         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1343         goto error;
1344     }
1345 
1346     /* Setup the non blocking download of the bulk file. */
1347     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1348             == AE_ERR)
1349     {
1350         redisLog(REDIS_WARNING,
1351             "Can't create readable event for SYNC: %s (fd=%d)",
1352             strerror(errno),fd);
1353         goto error;
1354     }
1355 
1356     server.repl_state = REDIS_REPL_TRANSFER;
1357     server.repl_transfer_size = -1;
1358     server.repl_transfer_read = 0;
1359     server.repl_transfer_last_fsync_off = 0;
1360     server.repl_transfer_fd = dfd;
1361     server.repl_transfer_lastio = server.unixtime;
1362     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1363     return;
1364 
1365 error:
1366     close(fd);
1367     server.repl_transfer_s = -1;
1368     server.repl_state = REDIS_REPL_CONNECT;
1369     return;
1370 }
1371 
1372 int connectWithMaster(void) {
1373     int fd;
1374 
1375     fd = anetTcpNonBlockBindConnect(NULL,
1376         server.masterhost,server.masterport,REDIS_BIND_ADDR);
1377     if (fd == -1) {
1378         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
1379             strerror(errno));
1380         return REDIS_ERR;
1381     }
1382 
1383     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1384             AE_ERR)
1385     {
1386         close(fd);
1387         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
1388         return REDIS_ERR;
1389     }
1390 
1391     server.repl_transfer_lastio = server.unixtime;
1392     server.repl_transfer_s = fd;
1393     server.repl_state = REDIS_REPL_CONNECTING;
1394     return REDIS_OK;
1395 }
1396 
1397 /* This function can be called when a non blocking connection is currently
1398  * in progress to undo it. */
1399 void undoConnectWithMaster(void) {
1400     int fd = server.repl_transfer_s;
1401 
1402     redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
1403                 server.repl_state == REDIS_REPL_RECEIVE_PONG);
1404     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1405     close(fd);
1406     server.repl_transfer_s = -1;
1407     server.repl_state = REDIS_REPL_CONNECT;
1408 }
1409 
1410 /* This function aborts a non blocking replication attempt if there is one
1411  * in progress, by canceling the non-blocking connect attempt or
1412  * the initial bulk transfer.
1413  *
1414  * If there was a replication handshake in progress 1 is returned and
1415  * the replication state (server.repl_state) set to REDIS_REPL_CONNECT.
1416  *
1417  * Otherwise zero is returned and no operation is perforemd at all. */
1418 int cancelReplicationHandshake(void) {
1419     if (server.repl_state == REDIS_REPL_TRANSFER) {
1420         replicationAbortSyncTransfer();
1421     } else if (server.repl_state == REDIS_REPL_CONNECTING ||
1422              server.repl_state == REDIS_REPL_RECEIVE_PONG)
1423     {
1424         undoConnectWithMaster();
1425     } else {
1426         return 0;
1427     }
1428     return 1;
1429 }
1430 
1431 /* Set replication to the specified master address and port. */
1432 void replicationSetMaster(char *ip, int port) {
1433     sdsfree(server.masterhost);
1434     server.masterhost = sdsnew(ip);
1435     server.masterport = port;
1436     if (server.master) freeClient(server.master);
1437     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1438     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1439     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1440     cancelReplicationHandshake();
1441     server.repl_state = REDIS_REPL_CONNECT;
1442     server.master_repl_offset = 0;
1443     server.repl_down_since = 0;
1444 }
1445 
1446 /* Cancel replication, setting the instance as a master itself. */
1447 void replicationUnsetMaster(void) {
1448     if (server.masterhost == NULL) return; /* Nothing to do. */
1449     sdsfree(server.masterhost);
1450     server.masterhost = NULL;
1451     if (server.master) {
1452         if (listLength(server.slaves) == 0) {
1453             /* If this instance is turned into a master and there are no
1454              * slaves, it inherits the replication offset from the master.
1455              * Under certain conditions this makes replicas comparable by
1456              * replication offset to understand what is the most updated. */
1457             server.master_repl_offset = server.master->reploff;
1458             freeReplicationBacklog();
1459         }
1460         freeClient(server.master);
1461     }
1462     replicationDiscardCachedMaster();
1463     cancelReplicationHandshake();
1464     server.repl_state = REDIS_REPL_NONE;
1465 }
1466 
1467 void slaveofCommand(redisClient *c) {
1468     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1469      * configured using the current address of the master node. */
1470     if (server.cluster_enabled) {
1471         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1472         return;
1473     }
1474 
1475     /* The special host/port combination "NO" "ONE" turns the instance
1476      * into a master. Otherwise the new master address is set. */
1477     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1478         !strcasecmp(c->argv[2]->ptr,"one")) {
1479         if (server.masterhost) {
1480             replicationUnsetMaster();
1481             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
1482         }
1483     } else {
1484         long port;
1485 
1486         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
1487             return;
1488 
1489         /* Check if we are already attached to the specified slave */
1490         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1491             && server.masterport == port) {
1492             redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1493             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1494             return;
1495         }
1496         /* There was no previous master or the user specified a different one,
1497          * we can continue. */
1498         replicationSetMaster(c->argv[1]->ptr, port);
1499         redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
1500             server.masterhost, server.masterport);
1501     }
1502     addReply(c,shared.ok);
1503 }
1504 
1505 /* ROLE command: provide information about the role of the instance
1506  * (master or slave) and additional information related to replication
1507  * in an easy to process format. */
1508 void roleCommand(redisClient *c) {
1509     if (server.masterhost == NULL) {
1510         listIter li;
1511         listNode *ln;
1512         void *mbcount;
1513         int slaves = 0;
1514 
1515         addReplyMultiBulkLen(c,3);
1516         addReplyBulkCBuffer(c,"master",6);
1517         addReplyLongLong(c,server.master_repl_offset);
1518         mbcount = addDeferredMultiBulkLength(c);
1519         listRewind(server.slaves,&li);
1520         while((ln = listNext(&li))) {
1521             redisClient *slave = ln->value;
1522             char ip[REDIS_IP_STR_LEN];
1523 
1524             if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
1525             if (slave->replstate != REDIS_REPL_ONLINE) continue;
1526             addReplyMultiBulkLen(c,3);
1527             addReplyBulkCString(c,ip);
1528             addReplyBulkLongLong(c,slave->slave_listening_port);
1529             addReplyBulkLongLong(c,slave->repl_ack_off);
1530             slaves++;
1531         }
1532         setDeferredMultiBulkLength(c,mbcount,slaves);
1533     } else {
1534         char *slavestate = NULL;
1535 
1536         addReplyMultiBulkLen(c,5);
1537         addReplyBulkCBuffer(c,"slave",5);
1538         addReplyBulkCString(c,server.masterhost);
1539         addReplyLongLong(c,server.masterport);
1540         switch(server.repl_state) {
1541         case REDIS_REPL_NONE: slavestate = "none"; break;
1542         case REDIS_REPL_CONNECT: slavestate = "connect"; break;
1543         case REDIS_REPL_CONNECTING: slavestate = "connecting"; break;
1544         case REDIS_REPL_RECEIVE_PONG: /* see next */
1545         case REDIS_REPL_TRANSFER: slavestate = "sync"; break;
1546         case REDIS_REPL_CONNECTED: slavestate = "connected"; break;
1547         default: slavestate = "unknown"; break;
1548         }
1549         addReplyBulkCString(c,slavestate);
1550         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1551     }
1552 }
1553 
1554 /* Send a REPLCONF ACK command to the master to inform it about the current
1555  * processed offset. If we are not connected with a master, the command has
1556  * no effects. */
1557 void replicationSendAck(void) {
1558     redisClient *c = server.master;
1559 
1560     if (c != NULL) {
1561         c->flags |= REDIS_MASTER_FORCE_REPLY;
1562         addReplyMultiBulkLen(c,3);
1563         addReplyBulkCString(c,"REPLCONF");
1564         addReplyBulkCString(c,"ACK");
1565         addReplyBulkLongLong(c,c->reploff);
1566         c->flags &= ~REDIS_MASTER_FORCE_REPLY;
1567     }
1568 }
1569 
1570 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1571 
1572 /* In order to implement partial synchronization we need to be able to cache
1573  * our master's client structure after a transient disconnection.
1574  * It is cached into server.cached_master and flushed away using the following
1575  * functions. */
1576 
1577 /* This function is called by freeClient() in order to cache the master
1578  * client structure instead of destryoing it. freeClient() will return
1579  * ASAP after this function returns, so every action needed to avoid problems
1580  * with a client that is really "suspended" has to be done by this function.
1581  *
1582  * The other functions that will deal with the cached master are:
1583  *
1584  * replicationDiscardCachedMaster() that will make sure to kill the client
1585  * as for some reason we don't want to use it in the future.
1586  *
1587  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1588  * handshake in order to reactivate the cached master.
1589  */
1590 void replicationCacheMaster(redisClient *c) {
1591     listNode *ln;
1592 
1593     redisAssert(server.master != NULL && server.cached_master == NULL);
1594     redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
1595 
1596     /* Remove from the list of clients, we don't want this client to be
1597      * listed by CLIENT LIST or processed in any way by batch operations. */
1598     ln = listSearchKey(server.clients,c);
1599     redisAssert(ln != NULL);
1600     listDelNode(server.clients,ln);
1601 
1602     /* Save the master. Server.master will be set to null later by
1603      * replicationHandleMasterDisconnection(). */
1604     server.cached_master = server.master;
1605 
1606     /* Remove the event handlers and close the socket. We'll later reuse
1607      * the socket of the new connection with the master during PSYNC. */
1608     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1609     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1610     close(c->fd);
1611 
1612     /* Set fd to -1 so that we can safely call freeClient(c) later. */
1613     c->fd = -1;
1614 
1615     /* Invalidate the Peer ID cache. */
1616     if (c->peerid) {
1617         sdsfree(c->peerid);
1618         c->peerid = NULL;
1619     }
1620 
1621     /* Caching the master happens instead of the actual freeClient() call,
1622      * so make sure to adjust the replication state. This function will
1623      * also set server.master to NULL. */
1624     replicationHandleMasterDisconnection();
1625 }
1626 
1627 /* Free a cached master, called when there are no longer the conditions for
1628  * a partial resync on reconnection. */
1629 void replicationDiscardCachedMaster(void) {
1630     if (server.cached_master == NULL) return;
1631 
1632     redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
1633     server.cached_master->flags &= ~REDIS_MASTER;
1634     freeClient(server.cached_master);
1635     server.cached_master = NULL;
1636 }
1637 
1638 /* Turn the cached master into the current master, using the file descriptor
1639  * passed as argument as the socket for the new master.
1640  *
1641  * This function is called when successfully setup a partial resynchronization
1642  * so the stream of data that we'll receive will start from were this
1643  * master left. */
1644 void replicationResurrectCachedMaster(int newfd) {
1645     server.master = server.cached_master;
1646     server.cached_master = NULL;
1647     server.master->fd = newfd;
1648     server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
1649     server.master->authenticated = 1;
1650     server.master->lastinteraction = server.unixtime;
1651     server.repl_state = REDIS_REPL_CONNECTED;
1652 
1653     /* Re-add to the list of clients. */
1654     listAddNodeTail(server.clients,server.master);
1655     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1656                           readQueryFromClient, server.master)) {
1657         redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1658         freeClientAsync(server.master); /* Close ASAP. */
1659     }
1660 
1661     /* We may also need to install the write handler as well if there is
1662      * pending data in the write buffers. */
1663     if (server.master->bufpos || listLength(server.master->reply)) {
1664         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1665                           sendReplyToClient, server.master)) {
1666             redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1667             freeClientAsync(server.master); /* Close ASAP. */
1668         }
1669     }
1670 }
1671 
1672 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1673 
1674 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1675  * If the option is active, the server will prevent writes if there are not
1676  * enough connected slaves with the specified lag (or less). */
1677 void refreshGoodSlavesCount(void) {
1678     listIter li;
1679     listNode *ln;
1680     int good = 0;
1681 
1682     if (!server.repl_min_slaves_to_write ||
1683         !server.repl_min_slaves_max_lag) return;
1684 
1685     listRewind(server.slaves,&li);
1686     while((ln = listNext(&li))) {
1687         redisClient *slave = ln->value;
1688         time_t lag = server.unixtime - slave->repl_ack_time;
1689 
1690         if (slave->replstate == REDIS_REPL_ONLINE &&
1691             lag <= server.repl_min_slaves_max_lag) good++;
1692     }
1693     server.repl_good_slaves_count = good;
1694 }
1695 
1696 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
1697  * The goal of this code is to keep track of scripts already sent to every
1698  * connected slave, in order to be able to replicate EVALSHA as it is without
1699  * translating it to EVAL every time it is possible.
1700  *
1701  * We use a capped collection implemented by a hash table for fast lookup
1702  * of scripts we can send as EVALSHA, plus a linked list that is used for
1703  * eviction of the oldest entry when the max number of items is reached.
1704  *
1705  * We don't care about taking a different cache for every different slave
1706  * since to fill the cache again is not very costly, the goal of this code
1707  * is to avoid that the same big script is trasmitted a big number of times
1708  * per second wasting bandwidth and processor speed, but it is not a problem
1709  * if we need to rebuild the cache from scratch from time to time, every used
1710  * script will need to be transmitted a single time to reappear in the cache.
1711  *
1712  * This is how the system works:
1713  *
1714  * 1) Every time a new slave connects, we flush the whole script cache.
1715  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
1716  *    trying to convert EVAL into EVALSHA specifically for slaves.
1717  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
1718  *    corresponding SHA1 of the script into the cache as we are sure every
1719  *    slave knows about the script starting from now.
1720  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
1721  *    and at the same time flush the script cache.
1722  * 5) When the last slave disconnects, flush the cache.
1723  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
1724  *    in the master sometimes.
1725  */
1726 
1727 /* Initialize the script cache, only called at startup. */
1728 void replicationScriptCacheInit(void) {
1729     server.repl_scriptcache_size = 10000;
1730     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
1731     server.repl_scriptcache_fifo = listCreate();
1732 }
1733 
1734 /* Empty the script cache. Should be called every time we are no longer sure
1735  * that every slave knows about all the scripts in our set, or when the
1736  * current AOF "context" is no longer aware of the script. In general we
1737  * should flush the cache:
1738  *
1739  * 1) Every time a new slave reconnects to this master and performs a
1740  *    full SYNC (PSYNC does not require flushing).
1741  * 2) Every time an AOF rewrite is performed.
1742  * 3) Every time we are left without slaves at all, and AOF is off, in order
1743  *    to reclaim otherwise unused memory.
1744  */
1745 void replicationScriptCacheFlush(void) {
1746     dictEmpty(server.repl_scriptcache_dict,NULL);
1747     listRelease(server.repl_scriptcache_fifo);
1748     server.repl_scriptcache_fifo = listCreate();
1749 }
1750 
1751 /* Add an entry into the script cache, if we reach max number of entries the
1752  * oldest is removed from the list. */
1753 void replicationScriptCacheAdd(sds sha1) {
1754     int retval;
1755     sds key = sdsdup(sha1);
1756 
1757     /* Evict oldest. */
1758     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
1759     {
1760         listNode *ln = listLast(server.repl_scriptcache_fifo);
1761         sds oldest = listNodeValue(ln);
1762 
1763         retval = dictDelete(server.repl_scriptcache_dict,oldest);
1764         redisAssert(retval == DICT_OK);
1765         listDelNode(server.repl_scriptcache_fifo,ln);
1766     }
1767 
1768     /* Add current. */
1769     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
1770     listAddNodeHead(server.repl_scriptcache_fifo,key);
1771     redisAssert(retval == DICT_OK);
1772 }
1773 
1774 /* Returns non-zero if the specified entry exists inside the cache, that is,
1775  * if all the slaves are aware of this script SHA1. */
1776 int replicationScriptCacheExists(sds sha1) {
1777     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
1778 }
1779 
1780 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
1781  * Redis synchronous replication design can be summarized in points:
1782  *
1783  * - Redis masters have a global replication offset, used by PSYNC.
1784  * - Master increment the offset every time new commands are sent to slaves.
1785  * - Slaves ping back masters with the offset processed so far.
1786  *
1787  * So synchronous replication adds a new WAIT command in the form:
1788  *
1789  *   WAIT <num_replicas> <milliseconds_timeout>
1790  *
1791  * That returns the number of replicas that processed the query when
1792  * we finally have at least num_replicas, or when the timeout was
1793  * reached.
1794  *
1795  * The command is implemented in this way:
1796  *
1797  * - Every time a client processes a command, we remember the replication
1798  *   offset after sending that command to the slaves.
1799  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
1800  *   The client is blocked at the same time (see blocked.c).
1801  * - Once we receive enough ACKs for a given offset or when the timeout
1802  *   is reached, the WAIT command is unblocked and the reply sent to the
1803  *   client.
1804  */
1805 
1806 /* This just set a flag so that we broadcast a REPLCONF GETACK command
1807  * to all the slaves in the beforeSleep() function. Note that this way
1808  * we "group" all the clients that want to wait for synchronouns replication
1809  * in a given event loop iteration, and send a single GETACK for them all. */
1810 void replicationRequestAckFromSlaves(void) {
1811     server.get_ack_from_slaves = 1;
1812 }
1813 
1814 /* Return the number of slaves that already acknowledged the specified
1815  * replication offset. */
1816 int replicationCountAcksByOffset(long long offset) {
1817     listIter li;
1818     listNode *ln;
1819     int count = 0;
1820 
1821     listRewind(server.slaves,&li);
1822     while((ln = listNext(&li))) {
1823         redisClient *slave = ln->value;
1824 
1825         if (slave->replstate != REDIS_REPL_ONLINE) continue;
1826         if (slave->repl_ack_off >= offset) count++;
1827     }
1828     return count;
1829 }
1830 
1831 /* WAIT for N replicas to acknowledge the processing of our latest
1832  * write command (and all the previous commands). */
1833 void waitCommand(redisClient *c) {
1834     mstime_t timeout;
1835     long numreplicas, ackreplicas;
1836     long long offset = c->woff;
1837 
1838     /* Argument parsing. */
1839     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
1840         return;
1841     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
1842         != REDIS_OK) return;
1843 
1844     /* First try without blocking at all. */
1845     ackreplicas = replicationCountAcksByOffset(c->woff);
1846     if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
1847         addReplyLongLong(c,ackreplicas);
1848         return;
1849     }
1850 
1851     /* Otherwise block the client and put it into our list of clients
1852      * waiting for ack from slaves. */
1853     c->bpop.timeout = timeout;
1854     c->bpop.reploffset = offset;
1855     c->bpop.numreplicas = numreplicas;
1856     listAddNodeTail(server.clients_waiting_acks,c);
1857     blockClient(c,REDIS_BLOCKED_WAIT);
1858 
1859     /* Make sure that the server will send an ACK request to all the slaves
1860      * before returning to the event loop. */
1861     replicationRequestAckFromSlaves();
1862 }
1863 
1864 /* This is called by unblockClient() to perform the blocking op type
1865  * specific cleanup. We just remove the client from the list of clients
1866  * waiting for replica acks. Never call it directly, call unblockClient()
1867  * instead. */
1868 void unblockClientWaitingReplicas(redisClient *c) {
1869     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
1870     redisAssert(ln != NULL);
1871     listDelNode(server.clients_waiting_acks,ln);
1872 }
1873 
1874 /* Check if there are clients blocked in WAIT that can be unblocked since
1875  * we received enough ACKs from slaves. */
1876 void processClientsWaitingReplicas(void) {
1877     long long last_offset = 0;
1878     int last_numreplicas = 0;
1879 
1880     listIter li;
1881     listNode *ln;
1882 
1883     listRewind(server.clients_waiting_acks,&li);
1884     while((ln = listNext(&li))) {
1885         redisClient *c = ln->value;
1886 
1887         /* Every time we find a client that is satisfied for a given
1888          * offset and number of replicas, we remember it so the next client
1889          * may be unblocked without calling replicationCountAcksByOffset()
1890          * if the requested offset / replicas were equal or less. */
1891         if (last_offset && last_offset > c->bpop.reploffset &&
1892                            last_numreplicas > c->bpop.numreplicas)
1893         {
1894             unblockClient(c);
1895             addReplyLongLong(c,last_numreplicas);
1896         } else {
1897             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
1898 
1899             if (numreplicas >= c->bpop.numreplicas) {
1900                 last_offset = c->bpop.reploffset;
1901                 last_numreplicas = numreplicas;
1902                 unblockClient(c);
1903                 addReplyLongLong(c,numreplicas);
1904             }
1905         }
1906     }
1907 }
1908 
1909 /* Return the slave replication offset for this instance, that is
1910  * the offset for which we already processed the master replication stream. */
1911 long long replicationGetSlaveOffset(void) {
1912     long long offset = 0;
1913 
1914     if (server.masterhost != NULL) {
1915         if (server.master) {
1916             offset = server.master->reploff;
1917         } else if (server.cached_master) {
1918             offset = server.cached_master->reploff;
1919         }
1920     }
1921     /* offset may be -1 when the master does not support it at all, however
1922      * this function is designed to return an offset that can express the
1923      * amount of data processed by the master, so we return a positive
1924      * integer. */
1925     if (offset < 0) offset = 0;
1926     return offset;
1927 }
1928 
1929 /* --------------------------- REPLICATION CRON  ---------------------------- */
1930 
1931 /* Replication cron function, called 1 time per second. */
1932 void replicationCron(void) {
1933     /* Non blocking connection timeout? */
1934     if (server.masterhost &&
1935         (server.repl_state == REDIS_REPL_CONNECTING ||
1936          server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
1937         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1938     {
1939         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
1940         undoConnectWithMaster();
1941     }
1942 
1943     /* Bulk transfer I/O timeout? */
1944     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
1945         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1946     {
1947         redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
1948         replicationAbortSyncTransfer();
1949     }
1950 
1951     /* Timed out master when we are an already connected slave? */
1952     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
1953         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
1954     {
1955         redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
1956         freeClient(server.master);
1957     }
1958 
1959     /* Check if we should connect to a MASTER */
1960     if (server.repl_state == REDIS_REPL_CONNECT) {
1961         redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
1962             server.masterhost, server.masterport);
1963         if (connectWithMaster() == REDIS_OK) {
1964             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
1965         }
1966     }
1967 
1968     /* Send ACK to master from time to time.
1969      * Note that we do not send periodic acks to masters that don't
1970      * support PSYNC and replication offsets. */
1971     if (server.masterhost && server.master &&
1972         !(server.master->flags & REDIS_PRE_PSYNC))
1973         replicationSendAck();
1974 
1975     /* If we have attached slaves, PING them from time to time.
1976      * So slaves can implement an explicit timeout to masters, and will
1977      * be able to detect a link disconnection even if the TCP connection
1978      * will not actually go down. */
1979     if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
1980         listIter li;
1981         listNode *ln;
1982         robj *ping_argv[1];
1983 
1984         /* First, send PING */
1985         ping_argv[0] = createStringObject("PING",4);
1986         replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
1987         decrRefCount(ping_argv[0]);
1988 
1989         /* Second, send a newline to all the slaves in pre-synchronization
1990          * stage, that is, slaves waiting for the master to create the RDB file.
1991          * The newline will be ignored by the slave but will refresh the
1992          * last-io timer preventing a timeout. */
1993         listRewind(server.slaves,&li);
1994         while((ln = listNext(&li))) {
1995             redisClient *slave = ln->value;
1996 
1997             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
1998                 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END &&
1999                  server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET))
2000             {
2001                 if (write(slave->fd, "\n", 1) == -1) {
2002                     /* Don't worry, it's just a ping. */
2003                 }
2004             }
2005         }
2006     }
2007 
2008     /* Disconnect timedout slaves. */
2009     if (listLength(server.slaves)) {
2010         listIter li;
2011         listNode *ln;
2012 
2013         listRewind(server.slaves,&li);
2014         while((ln = listNext(&li))) {
2015             redisClient *slave = ln->value;
2016 
2017             if (slave->replstate != REDIS_REPL_ONLINE) continue;
2018             if (slave->flags & REDIS_PRE_PSYNC) continue;
2019             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
2020             {
2021                 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s",
2022                     replicationGetSlaveName(slave));
2023                 freeClient(slave);
2024             }
2025         }
2026     }
2027 
2028     /* If we have no attached slaves and there is a replication backlog
2029      * using memory, free it after some (configured) time. */
2030     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
2031         server.repl_backlog)
2032     {
2033         time_t idle = server.unixtime - server.repl_no_slaves_since;
2034 
2035         if (idle > server.repl_backlog_time_limit) {
2036             freeReplicationBacklog();
2037             redisLog(REDIS_NOTICE,
2038                 "Replication backlog freed after %d seconds "
2039                 "without connected slaves.",
2040                 (int) server.repl_backlog_time_limit);
2041         }
2042     }
2043 
2044     /* If AOF is disabled and we no longer have attached slaves, we can
2045      * free our Replication Script Cache as there is no need to propagate
2046      * EVALSHA at all. */
2047     if (listLength(server.slaves) == 0 &&
2048         server.aof_state == REDIS_AOF_OFF &&
2049         listLength(server.repl_scriptcache_fifo) != 0)
2050     {
2051         replicationScriptCacheFlush();
2052     }
2053 
2054     /* If we are using diskless replication and there are slaves waiting
2055      * in WAIT_BGSAVE_START state, check if enough seconds elapsed and
2056      * start a BGSAVE.
2057      *
2058      * This code is also useful to trigger a BGSAVE if the diskless
2059      * replication was turned off with CONFIG SET, while there were already
2060      * slaves in WAIT_BGSAVE_START state. */
2061     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2062         time_t idle, max_idle = 0;
2063         int slaves_waiting = 0;
2064         listNode *ln;
2065         listIter li;
2066 
2067         listRewind(server.slaves,&li);
2068         while((ln = listNext(&li))) {
2069             redisClient *slave = ln->value;
2070             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
2071                 idle = server.unixtime - slave->lastinteraction;
2072                 if (idle > max_idle) max_idle = idle;
2073                 slaves_waiting++;
2074             }
2075         }
2076 
2077         if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
2078             /* Start a BGSAVE. Usually with socket target, or with disk target
2079              * if there was a recent socket -> disk config change. */
2080             if (startBgsaveForReplication() == REDIS_OK) {
2081                 /* It started! We need to change the state of slaves
2082                  * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
2083                  * the current target is disk. Otherwise it was already done
2084                  * by rdbSaveToSlavesSockets() which is called by
2085                  * startBgsaveForReplication(). */
2086                 listRewind(server.slaves,&li);
2087                 while((ln = listNext(&li))) {
2088                     redisClient *slave = ln->value;
2089                     if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
2090                         slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
2091                 }
2092             }
2093         }
2094     }
2095 
2096     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2097     refreshGoodSlavesCount();
2098 }
2099