xref: /redis-3.2.3/src/replication.c (revision e67ad1d1)
14365e5b2Santirez /* Asynchronous replication implementation.
24365e5b2Santirez  *
34365e5b2Santirez  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
44365e5b2Santirez  * All rights reserved.
54365e5b2Santirez  *
64365e5b2Santirez  * Redistribution and use in source and binary forms, with or without
74365e5b2Santirez  * modification, are permitted provided that the following conditions are met:
84365e5b2Santirez  *
94365e5b2Santirez  *   * Redistributions of source code must retain the above copyright notice,
104365e5b2Santirez  *     this list of conditions and the following disclaimer.
114365e5b2Santirez  *   * Redistributions in binary form must reproduce the above copyright
124365e5b2Santirez  *     notice, this list of conditions and the following disclaimer in the
134365e5b2Santirez  *     documentation and/or other materials provided with the distribution.
144365e5b2Santirez  *   * Neither the name of Redis nor the names of its contributors may be used
154365e5b2Santirez  *     to endorse or promote products derived from this software without
164365e5b2Santirez  *     specific prior written permission.
174365e5b2Santirez  *
184365e5b2Santirez  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
194365e5b2Santirez  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
204365e5b2Santirez  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
214365e5b2Santirez  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
224365e5b2Santirez  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
234365e5b2Santirez  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
244365e5b2Santirez  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
254365e5b2Santirez  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
264365e5b2Santirez  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
274365e5b2Santirez  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
284365e5b2Santirez  * POSSIBILITY OF SUCH DAMAGE.
294365e5b2Santirez  */
304365e5b2Santirez 
314365e5b2Santirez 
32cef054e8Santirez #include "server.h"
33e2641e09Santirez 
34e2641e09Santirez #include <sys/time.h>
35e2641e09Santirez #include <unistd.h>
36e2641e09Santirez #include <fcntl.h>
37d310fbedSantirez #include <sys/socket.h>
38e2641e09Santirez #include <sys/stat.h>
39e2641e09Santirez 
4007888202Santirez void replicationDiscardCachedMaster(void);
4107888202Santirez void replicationResurrectCachedMaster(int newfd);
42c5618e7fSantirez void replicationSendAck(void);
43554bd0e7Santirez void putSlaveOnline(client *slave);
44c5f8c80aSantirez int cancelReplicationHandshake(void);
4507888202Santirez 
468a416ca4Santirez /* --------------------------- Utility functions ---------------------------- */
478a416ca4Santirez 
488a416ca4Santirez /* Return the pointer to a string representing the slave ip:listening_port
498a416ca4Santirez  * pair. Mostly useful for logging, since we want to log a slave using its
500a45fbc3Santirez  * IP address and its listening port which is more clear for the user, for
518a416ca4Santirez  * example: "Closing connection with slave 10.1.2.3:6380". */
replicationGetSlaveName(client * c)52554bd0e7Santirez char *replicationGetSlaveName(client *c) {
5332f80e2fSantirez     static char buf[NET_PEER_ID_LEN];
5432f80e2fSantirez     char ip[NET_IP_STR_LEN];
558a416ca4Santirez 
568a416ca4Santirez     ip[0] = '\0';
578a416ca4Santirez     buf[0] = '\0';
580a45fbc3Santirez     if (c->slave_ip[0] != '\0' ||
590a45fbc3Santirez         anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
600a45fbc3Santirez     {
610a45fbc3Santirez         /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
620a45fbc3Santirez         if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
630a45fbc3Santirez 
648a416ca4Santirez         if (c->slave_listening_port)
65ce269ad3Santirez             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
668a416ca4Santirez         else
678a416ca4Santirez             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
688a416ca4Santirez     } else {
698a416ca4Santirez         snprintf(buf,sizeof(buf),"client id #%llu",
708a416ca4Santirez             (unsigned long long) c->id);
718a416ca4Santirez     }
728a416ca4Santirez     return buf;
738a416ca4Santirez }
748a416ca4Santirez 
75f4aa600bSantirez /* ---------------------------------- MASTER -------------------------------- */
76f4aa600bSantirez 
createReplicationBacklog(void)7707888202Santirez void createReplicationBacklog(void) {
782d9e3eb1Santirez     serverAssert(server.repl_backlog == NULL);
7907888202Santirez     server.repl_backlog = zmalloc(server.repl_backlog_size);
8007888202Santirez     server.repl_backlog_histlen = 0;
8107888202Santirez     server.repl_backlog_idx = 0;
8207888202Santirez     /* When a new backlog buffer is created, we increment the replication
8307888202Santirez      * offset by one to make sure we'll not be able to PSYNC with any
8407888202Santirez      * previous slave. This is needed because we avoid incrementing the
8507888202Santirez      * master_repl_offset if no backlog exists nor slaves are attached. */
8607888202Santirez     server.master_repl_offset++;
8707888202Santirez 
8807888202Santirez     /* We don't have any data inside our buffer, but virtually the first
8907888202Santirez      * byte we have is the next byte that will be generated for the
9007888202Santirez      * replication stream. */
9107888202Santirez     server.repl_backlog_off = server.master_repl_offset+1;
9207888202Santirez }
9307888202Santirez 
9407888202Santirez /* This function is called when the user modifies the replication backlog
9507888202Santirez  * size at runtime. It is up to the function to both update the
9607888202Santirez  * server.repl_backlog_size and to resize the buffer and setup it so that
9707888202Santirez  * it contains the same data as the previous one (possibly less data, but
9807888202Santirez  * the most recent bytes, or the same data and more free space in case the
9907888202Santirez  * buffer is enlarged). */
resizeReplicationBacklog(long long newsize)10007888202Santirez void resizeReplicationBacklog(long long newsize) {
10132f80e2fSantirez     if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
10232f80e2fSantirez         newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
10307888202Santirez     if (server.repl_backlog_size == newsize) return;
10407888202Santirez 
10507888202Santirez     server.repl_backlog_size = newsize;
10607888202Santirez     if (server.repl_backlog != NULL) {
10707888202Santirez         /* What we actually do is to flush the old buffer and realloc a new
10807888202Santirez          * empty one. It will refill with new data incrementally.
10907888202Santirez          * The reason is that copying a few gigabytes adds latency and even
11007888202Santirez          * worse often we need to alloc additional space before freeing the
11107888202Santirez          * old buffer. */
11207888202Santirez         zfree(server.repl_backlog);
11307888202Santirez         server.repl_backlog = zmalloc(server.repl_backlog_size);
11407888202Santirez         server.repl_backlog_histlen = 0;
11507888202Santirez         server.repl_backlog_idx = 0;
1163a82b8acSAaron Rutkovsky         /* Next byte we have is... the next since the buffer is empty. */
11707888202Santirez         server.repl_backlog_off = server.master_repl_offset+1;
11807888202Santirez     }
11907888202Santirez }
12007888202Santirez 
freeReplicationBacklog(void)12107888202Santirez void freeReplicationBacklog(void) {
1222d9e3eb1Santirez     serverAssert(listLength(server.slaves) == 0);
12307888202Santirez     zfree(server.repl_backlog);
12407888202Santirez     server.repl_backlog = NULL;
12507888202Santirez }
12607888202Santirez 
12707888202Santirez /* Add data to the replication backlog.
12807888202Santirez  * This function also increments the global replication offset stored at
12907888202Santirez  * server.master_repl_offset, because there is no case where we want to feed
13007888202Santirez  * the backlog without incrementing the buffer. */
feedReplicationBacklog(void * ptr,size_t len)13107888202Santirez void feedReplicationBacklog(void *ptr, size_t len) {
13207888202Santirez     unsigned char *p = ptr;
13307888202Santirez 
13407888202Santirez     server.master_repl_offset += len;
13507888202Santirez 
13607888202Santirez     /* This is a circular buffer, so write as much data we can at every
13707888202Santirez      * iteration and rewind the "idx" index if we reach the limit. */
13807888202Santirez     while(len) {
13907888202Santirez         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
14007888202Santirez         if (thislen > len) thislen = len;
14107888202Santirez         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
14207888202Santirez         server.repl_backlog_idx += thislen;
14307888202Santirez         if (server.repl_backlog_idx == server.repl_backlog_size)
14407888202Santirez             server.repl_backlog_idx = 0;
14507888202Santirez         len -= thislen;
14607888202Santirez         p += thislen;
14707888202Santirez         server.repl_backlog_histlen += thislen;
14807888202Santirez     }
14907888202Santirez     if (server.repl_backlog_histlen > server.repl_backlog_size)
15007888202Santirez         server.repl_backlog_histlen = server.repl_backlog_size;
15107888202Santirez     /* Set the offset of the first byte we have in the backlog. */
15207888202Santirez     server.repl_backlog_off = server.master_repl_offset -
15307888202Santirez                               server.repl_backlog_histlen + 1;
15407888202Santirez }
15507888202Santirez 
15607888202Santirez /* Wrapper for feedReplicationBacklog() that takes Redis string objects
15707888202Santirez  * as input. */
feedReplicationBacklogWithObject(robj * o)15807888202Santirez void feedReplicationBacklogWithObject(robj *o) {
15932f80e2fSantirez     char llstr[LONG_STR_SIZE];
16007888202Santirez     void *p;
16107888202Santirez     size_t len;
16207888202Santirez 
16314ff5724Santirez     if (o->encoding == OBJ_ENCODING_INT) {
16407888202Santirez         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
16507888202Santirez         p = llstr;
16607888202Santirez     } else {
16707888202Santirez         len = sdslen(o->ptr);
16807888202Santirez         p = o->ptr;
16907888202Santirez     }
17007888202Santirez     feedReplicationBacklog(p,len);
17107888202Santirez }
17207888202Santirez 
replicationFeedSlaves(list * slaves,int dictid,robj ** argv,int argc)173e2641e09Santirez void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
174e2641e09Santirez     listNode *ln;
175e2641e09Santirez     listIter li;
176dcc48a81Santirez     int j, len;
17732f80e2fSantirez     char llstr[LONG_STR_SIZE];
178e2641e09Santirez 
17907888202Santirez     /* If there aren't slaves, and there is no backlog buffer to populate,
18007888202Santirez      * we can return ASAP. */
18107888202Santirez     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
18207888202Santirez 
18307888202Santirez     /* We can't have slaves attached and no backlog. */
1842d9e3eb1Santirez     serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
18507888202Santirez 
186dcc48a81Santirez     /* Send SELECT command to every slave if needed. */
18707888202Santirez     if (server.slaveseldb != dictid) {
188dcc48a81Santirez         robj *selectcmd;
18907888202Santirez 
190dcc48a81Santirez         /* For a few DBs we have pre-computed SELECT command. */
19132f80e2fSantirez         if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
192dcc48a81Santirez             selectcmd = shared.select[dictid];
19307888202Santirez         } else {
19407888202Santirez             int dictid_len;
19507888202Santirez 
19607888202Santirez             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
19714ff5724Santirez             selectcmd = createObject(OBJ_STRING,
198dcc48a81Santirez                 sdscatprintf(sdsempty(),
199dcc48a81Santirez                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
200dcc48a81Santirez                 dictid_len, llstr));
20107888202Santirez         }
202dcc48a81Santirez 
203dcc48a81Santirez         /* Add the SELECT command into the backlog. */
204dcc48a81Santirez         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
205dcc48a81Santirez 
206dcc48a81Santirez         /* Send it to slaves. */
207dcc48a81Santirez         listRewind(slaves,&li);
208dcc48a81Santirez         while((ln = listNext(&li))) {
209554bd0e7Santirez             client *slave = ln->value;
210a5a06a8eSantirez             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
211dcc48a81Santirez             addReply(slave,selectcmd);
212dcc48a81Santirez         }
213dcc48a81Santirez 
21432f80e2fSantirez         if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
215dcc48a81Santirez             decrRefCount(selectcmd);
21607888202Santirez     }
21707888202Santirez     server.slaveseldb = dictid;
21807888202Santirez 
219dcc48a81Santirez     /* Write the command to the replication backlog if any. */
22007888202Santirez     if (server.repl_backlog) {
22132f80e2fSantirez         char aux[LONG_STR_SIZE+3];
222dcc48a81Santirez 
223dcc48a81Santirez         /* Add the multi bulk reply length. */
224dcc48a81Santirez         aux[0] = '*';
22570e82e5cSMaxim Zakharov         len = ll2string(aux+1,sizeof(aux)-1,argc);
226dcc48a81Santirez         aux[len+1] = '\r';
227dcc48a81Santirez         aux[len+2] = '\n';
228dcc48a81Santirez         feedReplicationBacklog(aux,len+3);
229dcc48a81Santirez 
230dcc48a81Santirez         for (j = 0; j < argc; j++) {
231dcc48a81Santirez             long objlen = stringObjectLen(argv[j]);
23207888202Santirez 
23307888202Santirez             /* We need to feed the buffer with the object as a bulk reply
23407888202Santirez              * not just as a plain string, so create the $..CRLF payload len
2359f98b29cSJan-Erik Rediger              * and add the final CRLF */
23607888202Santirez             aux[0] = '$';
237dcc48a81Santirez             len = ll2string(aux+1,sizeof(aux)-1,objlen);
23807888202Santirez             aux[len+1] = '\r';
23907888202Santirez             aux[len+2] = '\n';
24007888202Santirez             feedReplicationBacklog(aux,len+3);
241dcc48a81Santirez             feedReplicationBacklogWithObject(argv[j]);
242c06de115Santirez             feedReplicationBacklog(aux+len+1,2);
24307888202Santirez         }
24407888202Santirez     }
24507888202Santirez 
246dcc48a81Santirez     /* Write the command to every slave. */
247e9e00755Santirez     listRewind(server.slaves,&li);
248e2641e09Santirez     while((ln = listNext(&li))) {
249554bd0e7Santirez         client *slave = ln->value;
250e2641e09Santirez 
251e2641e09Santirez         /* Don't feed slaves that are still waiting for BGSAVE to start */
25232f80e2fSantirez         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
253e2641e09Santirez 
254632e4c09SPieter Noordhuis         /* Feed slaves that are waiting for the initial SYNC (so these commands
2559d09ce39Sguiquanz          * are queued in the output buffer until the initial SYNC completes),
256632e4c09SPieter Noordhuis          * or are already in sync with the master. */
257e2641e09Santirez 
258dcc48a81Santirez         /* Add the multi bulk length. */
259dcc48a81Santirez         addReplyMultiBulkLen(slave,argc);
260e34a35a5Santirez 
26107888202Santirez         /* Finally any additional argument that was not stored inside the
26207888202Santirez          * static buffer if any (from j to argc). */
263dcc48a81Santirez         for (j = 0; j < argc; j++)
264dcc48a81Santirez             addReplyBulk(slave,argv[j]);
265e2641e09Santirez     }
266e2641e09Santirez }
267e2641e09Santirez 
replicationFeedMonitors(client * c,list * monitors,int dictid,robj ** argv,int argc)268554bd0e7Santirez void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
269e2641e09Santirez     listNode *ln;
270e2641e09Santirez     listIter li;
271d1cbad6dSantirez     int j;
272e2641e09Santirez     sds cmdrepr = sdsnew("+");
273e2641e09Santirez     robj *cmdobj;
274e2641e09Santirez     struct timeval tv;
275e2641e09Santirez 
276e2641e09Santirez     gettimeofday(&tv,NULL);
2772b2eca1fSPieter Noordhuis     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
27832f80e2fSantirez     if (c->flags & CLIENT_LUA) {
279e31b615eSantirez         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
28032f80e2fSantirez     } else if (c->flags & CLIENT_UNIX_SOCKET) {
2812ea41242Santirez         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
282e31b615eSantirez     } else {
2830bcc7cb4Santirez         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
284e31b615eSantirez     }
285e2641e09Santirez 
286e2641e09Santirez     for (j = 0; j < argc; j++) {
28714ff5724Santirez         if (argv[j]->encoding == OBJ_ENCODING_INT) {
288d3b958c3Santirez             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
289e2641e09Santirez         } else {
290e2641e09Santirez             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
291e2641e09Santirez                         sdslen(argv[j]->ptr));
292e2641e09Santirez         }
293e2641e09Santirez         if (j != argc-1)
294e2641e09Santirez             cmdrepr = sdscatlen(cmdrepr," ",1);
295e2641e09Santirez     }
296e2641e09Santirez     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
29714ff5724Santirez     cmdobj = createObject(OBJ_STRING,cmdrepr);
298e2641e09Santirez 
299e2641e09Santirez     listRewind(monitors,&li);
300e2641e09Santirez     while((ln = listNext(&li))) {
301554bd0e7Santirez         client *monitor = ln->value;
302e2641e09Santirez         addReply(monitor,cmdobj);
303e2641e09Santirez     }
304e2641e09Santirez     decrRefCount(cmdobj);
305e2641e09Santirez }
306e2641e09Santirez 
30707888202Santirez /* Feed the slave 'c' with the replication backlog starting from the
30807888202Santirez  * specified 'offset' up to the end of the backlog. */
addReplyReplicationBacklog(client * c,long long offset)309554bd0e7Santirez long long addReplyReplicationBacklog(client *c, long long offset) {
31007888202Santirez     long long j, skip, len;
31107888202Santirez 
31232f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
31307888202Santirez 
31407888202Santirez     if (server.repl_backlog_histlen == 0) {
31532f80e2fSantirez         serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
31607888202Santirez         return 0;
31707888202Santirez     }
31807888202Santirez 
31932f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
3203af478e9Santirez              server.repl_backlog_size);
32132f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
3223af478e9Santirez              server.repl_backlog_off);
32332f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
3243af478e9Santirez              server.repl_backlog_histlen);
32532f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
3263af478e9Santirez              server.repl_backlog_idx);
32707888202Santirez 
32807888202Santirez     /* Compute the amount of bytes we need to discard. */
32907888202Santirez     skip = offset - server.repl_backlog_off;
33032f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
33107888202Santirez 
33207888202Santirez     /* Point j to the oldest byte, that is actaully our
33307888202Santirez      * server.repl_backlog_off byte. */
33407888202Santirez     j = (server.repl_backlog_idx +
33507888202Santirez         (server.repl_backlog_size-server.repl_backlog_histlen)) %
33607888202Santirez         server.repl_backlog_size;
33732f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
33807888202Santirez 
33907888202Santirez     /* Discard the amount of data to seek to the specified 'offset'. */
34007888202Santirez     j = (j + skip) % server.repl_backlog_size;
34107888202Santirez 
34207888202Santirez     /* Feed slave with data. Since it is a circular buffer we have to
34307888202Santirez      * split the reply in two parts if we are cross-boundary. */
34407888202Santirez     len = server.repl_backlog_histlen - skip;
34532f80e2fSantirez     serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
34607888202Santirez     while(len) {
34707888202Santirez         long long thislen =
34807888202Santirez             ((server.repl_backlog_size - j) < len) ?
34907888202Santirez             (server.repl_backlog_size - j) : len;
35007888202Santirez 
35132f80e2fSantirez         serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
35207888202Santirez         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
35307888202Santirez         len -= thislen;
35407888202Santirez         j = 0;
35507888202Santirez     }
35607888202Santirez     return server.repl_backlog_histlen - skip;
35707888202Santirez }
35807888202Santirez 
359292fec05Santirez /* Return the offset to provide as reply to the PSYNC command received
360292fec05Santirez  * from the slave. The returned value is only valid immediately after
361292fec05Santirez  * the BGSAVE process started and before executing any other command
362292fec05Santirez  * from clients. */
getPsyncInitialOffset(void)363292fec05Santirez long long getPsyncInitialOffset(void) {
364292fec05Santirez     long long psync_offset = server.master_repl_offset;
365292fec05Santirez     /* Add 1 to psync_offset if it the replication backlog does not exists
366292fec05Santirez      * as when it will be created later we'll increment the offset by one. */
367292fec05Santirez     if (server.repl_backlog == NULL) psync_offset++;
368292fec05Santirez     return psync_offset;
369292fec05Santirez }
370292fec05Santirez 
37115de6b10Santirez /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
37215de6b10Santirez  * as a side effect setup the slave for a full sync in different ways:
37315de6b10Santirez  *
37415de6b10Santirez  * 1) Remember, into the slave client structure, the offset we sent
37515de6b10Santirez  *    here, so that if new slaves will later attach to the same
376292fec05Santirez  *    background RDB saving process (by duplicating this client output
37715de6b10Santirez  *    buffer), we can get the right offset from this slave.
37815de6b10Santirez  * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
37915de6b10Santirez  *    we start accumulating differences from this point.
38015de6b10Santirez  * 3) Force the replication stream to re-emit a SELECT statement so
38115de6b10Santirez  *    the new slave incremental differences will start selecting the
38215de6b10Santirez  *    right database number.
383f18e5b63Santirez  *
384f18e5b63Santirez  * Normally this function should be called immediately after a successful
385f18e5b63Santirez  * BGSAVE for replication was started, or when there is one already in
386f18e5b63Santirez  * progress that we attached our slave to. */
replicationSetupSlaveForFullResync(client * slave,long long offset)38715de6b10Santirez int replicationSetupSlaveForFullResync(client *slave, long long offset) {
388292fec05Santirez     char buf[128];
389292fec05Santirez     int buflen;
390292fec05Santirez 
391292fec05Santirez     slave->psync_initial_offset = offset;
39215de6b10Santirez     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
39315de6b10Santirez     /* We are going to accumulate the incremental changes for this
39415de6b10Santirez      * slave as well. Set slaveseldb to -1 in order to force to re-emit
39515de6b10Santirez      * a SLEECT statement in the replication stream. */
39615de6b10Santirez     server.slaveseldb = -1;
39715de6b10Santirez 
398292fec05Santirez     /* Don't send this reply to slaves that approached us with
399292fec05Santirez      * the old SYNC command. */
400292fec05Santirez     if (!(slave->flags & CLIENT_PRE_PSYNC)) {
401292fec05Santirez         buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
402292fec05Santirez                           server.runid,offset);
403292fec05Santirez         if (write(slave->fd,buf,buflen) != buflen) {
404292fec05Santirez             freeClientAsync(slave);
405292fec05Santirez             return C_ERR;
406292fec05Santirez         }
407292fec05Santirez     }
408292fec05Santirez     return C_OK;
409292fec05Santirez }
410292fec05Santirez 
41107888202Santirez /* This function handles the PSYNC command from the point of view of a
41207888202Santirez  * master receiving a request for partial resynchronization.
41307888202Santirez  *
41440eb548aSantirez  * On success return C_OK, otherwise C_ERR is returned and we proceed
41507888202Santirez  * with the usual full resync. */
masterTryPartialResynchronization(client * c)416554bd0e7Santirez int masterTryPartialResynchronization(client *c) {
41707888202Santirez     long long psync_offset, psync_len;
41807888202Santirez     char *master_runid = c->argv[1]->ptr;
4190ed6daa4Santirez     char buf[128];
4200ed6daa4Santirez     int buflen;
42107888202Santirez 
42207888202Santirez     /* Is the runid of this master the same advertised by the wannabe slave
42307888202Santirez      * via PSYNC? If runid changed this master is a different instance and
42407888202Santirez      * there is no way to continue. */
42507888202Santirez     if (strcasecmp(master_runid, server.runid)) {
42607888202Santirez         /* Run id "?" is used by slaves that want to force a full resync. */
42707888202Santirez         if (master_runid[0] != '?') {
42832f80e2fSantirez             serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
429707ff0f7Santirez                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
43007888202Santirez                 master_runid, server.runid);
43107888202Santirez         } else {
43232f80e2fSantirez             serverLog(LL_NOTICE,"Full resync requested by slave %s",
4334b8f4b90Santirez                 replicationGetSlaveName(c));
43407888202Santirez         }
43507888202Santirez         goto need_full_resync;
43607888202Santirez     }
43707888202Santirez 
43807888202Santirez     /* We still have the data our slave is asking for? */
43907888202Santirez     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
44040eb548aSantirez        C_OK) goto need_full_resync;
44107888202Santirez     if (!server.repl_backlog ||
44207888202Santirez         psync_offset < server.repl_backlog_off ||
44337e06bd9Santirez         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
44407888202Santirez     {
44532f80e2fSantirez         serverLog(LL_NOTICE,
4464b8f4b90Santirez             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
44737e06bd9Santirez         if (psync_offset > server.master_repl_offset) {
44832f80e2fSantirez             serverLog(LL_WARNING,
4494b8f4b90Santirez                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
45037e06bd9Santirez         }
45107888202Santirez         goto need_full_resync;
45207888202Santirez     }
45307888202Santirez 
45407888202Santirez     /* If we reached this point, we are able to perform a partial resync:
45507888202Santirez      * 1) Set client state to make it a slave.
45607888202Santirez      * 2) Inform the client we can continue with +CONTINUE
45707888202Santirez      * 3) Send the backlog data (from the offset to the end) to the slave. */
45832f80e2fSantirez     c->flags |= CLIENT_SLAVE;
45932f80e2fSantirez     c->replstate = SLAVE_STATE_ONLINE;
4603c82c85fSantirez     c->repl_ack_time = server.unixtime;
461bb7fea0dSantirez     c->repl_put_online_on_ack = 0;
46207888202Santirez     listAddNodeTail(server.slaves,c);
4630ed6daa4Santirez     /* We can't use the connection buffers since they are used to accumulate
4640ed6daa4Santirez      * new commands at this stage. But we are sure the socket send buffer is
4653a82b8acSAaron Rutkovsky      * empty so this write will never fail actually. */
4660ed6daa4Santirez     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
4670ed6daa4Santirez     if (write(c->fd,buf,buflen) != buflen) {
4680ed6daa4Santirez         freeClientAsync(c);
46940eb548aSantirez         return C_OK;
4700ed6daa4Santirez     }
47107888202Santirez     psync_len = addReplyReplicationBacklog(c,psync_offset);
47232f80e2fSantirez     serverLog(LL_NOTICE,
4734b8f4b90Santirez         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
4744b8f4b90Santirez             replicationGetSlaveName(c),
4754b8f4b90Santirez             psync_len, psync_offset);
47607888202Santirez     /* Note that we don't need to set the selected DB at server.slaveseldb
47707888202Santirez      * to -1 to force the master to emit SELECT, since the slave already
47807888202Santirez      * has this state from the previous connection with the master. */
47907888202Santirez 
4801a54d596Santirez     refreshGoodSlavesCount();
48140eb548aSantirez     return C_OK; /* The caller can return, no full resync needed. */
48207888202Santirez 
48307888202Santirez need_full_resync:
484292fec05Santirez     /* We need a full resync for some reason... Note that we can't
485292fec05Santirez      * reply to PSYNC right now if a full SYNC is needed. The reply
486292fec05Santirez      * must include the master offset at the time the RDB file we transfer
487292fec05Santirez      * is generated, so we need to delay the reply to that moment. */
48840eb548aSantirez     return C_ERR;
48907888202Santirez }
49007888202Santirez 
49175f0cd65Santirez /* Start a BGSAVE for replication goals, which is, selecting the disk or
49275f0cd65Santirez  * socket target depending on the configuration, and making sure that
49375f0cd65Santirez  * the script cache is flushed before to start.
49475f0cd65Santirez  *
4953e6d4d59Santirez  * The mincapa argument is the bitwise AND among all the slaves capabilities
4963e6d4d59Santirez  * of the slaves waiting for this BGSAVE, so represents the slave capabilities
4973e6d4d59Santirez  * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
4983e6d4d59Santirez  *
499f18e5b63Santirez  * Side effects, other than starting a BGSAVE:
500f18e5b63Santirez  *
501f18e5b63Santirez  * 1) Handle the slaves in WAIT_START state, by preparing them for a full
502f18e5b63Santirez  *    sync if the BGSAVE was succesfully started, or sending them an error
503f18e5b63Santirez  *    and dropping them from the list of slaves.
504f18e5b63Santirez  *
505f18e5b63Santirez  * 2) Flush the Lua scripting script cache if the BGSAVE was actually
506f18e5b63Santirez  *    started.
507f18e5b63Santirez  *
50840eb548aSantirez  * Returns C_OK on success or C_ERR otherwise. */
startBgsaveForReplication(int mincapa)5093e6d4d59Santirez int startBgsaveForReplication(int mincapa) {
51075f0cd65Santirez     int retval;
511ce5761e0Santirez     int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
512f18e5b63Santirez     listIter li;
513f18e5b63Santirez     listNode *ln;
51475f0cd65Santirez 
51532f80e2fSantirez     serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
516ce5761e0Santirez         socket_target ? "slaves sockets" : "disk");
51775f0cd65Santirez 
518ce5761e0Santirez     if (socket_target)
51975f0cd65Santirez         retval = rdbSaveToSlavesSockets();
52075f0cd65Santirez     else
52175f0cd65Santirez         retval = rdbSaveBackground(server.rdb_filename);
52275f0cd65Santirez 
523f18e5b63Santirez     /* If we failed to BGSAVE, remove the slaves waiting for a full
524f18e5b63Santirez      * resynchorinization from the list of salves, inform them with
525f18e5b63Santirez      * an error about what happened, close the connection ASAP. */
526f18e5b63Santirez     if (retval == C_ERR) {
527f18e5b63Santirez         serverLog(LL_WARNING,"BGSAVE for replication failed");
528f18e5b63Santirez         listRewind(server.slaves,&li);
529f18e5b63Santirez         while((ln = listNext(&li))) {
530f18e5b63Santirez             client *slave = ln->value;
531f18e5b63Santirez 
532f18e5b63Santirez             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
533f18e5b63Santirez                 slave->flags &= ~CLIENT_SLAVE;
534f18e5b63Santirez                 listDelNode(server.slaves,ln);
535f18e5b63Santirez                 addReplyError(slave,
536f18e5b63Santirez                     "BGSAVE failed, replication can't continue");
537f18e5b63Santirez                 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
538f18e5b63Santirez             }
539f18e5b63Santirez         }
540f18e5b63Santirez         return retval;
541f18e5b63Santirez     }
542f18e5b63Santirez 
543f18e5b63Santirez     /* If the target is socket, rdbSaveToSlavesSockets() already setup
544f18e5b63Santirez      * the salves for a full resync. Otherwise for disk target do it now.*/
545f18e5b63Santirez     if (!socket_target) {
546f18e5b63Santirez         listRewind(server.slaves,&li);
547f18e5b63Santirez         while((ln = listNext(&li))) {
548f18e5b63Santirez             client *slave = ln->value;
549f18e5b63Santirez 
550f18e5b63Santirez             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
551f18e5b63Santirez                     replicationSetupSlaveForFullResync(slave,
552f18e5b63Santirez                             getPsyncInitialOffset());
553f18e5b63Santirez             }
554f18e5b63Santirez         }
555f18e5b63Santirez     }
556f18e5b63Santirez 
55775f0cd65Santirez     /* Flush the script cache, since we need that slave differences are
55875f0cd65Santirez      * accumulated without requiring slaves to match our cached scripts. */
55940eb548aSantirez     if (retval == C_OK) replicationScriptCacheFlush();
56075f0cd65Santirez     return retval;
56175f0cd65Santirez }
56275f0cd65Santirez 
5639f98b29cSJan-Erik Rediger /* SYNC and PSYNC command implemenation. */
syncCommand(client * c)564554bd0e7Santirez void syncCommand(client *c) {
5659d09ce39Sguiquanz     /* ignore SYNC if already slave or in monitor mode */
56632f80e2fSantirez     if (c->flags & CLIENT_SLAVE) return;
567e2641e09Santirez 
568778b2210Santirez     /* Refuse SYNC requests if we are a slave but the link with our master
569778b2210Santirez      * is not ok... */
57032f80e2fSantirez     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
5713ab20376SPieter Noordhuis         addReplyError(c,"Can't SYNC while not connected with my master");
572778b2210Santirez         return;
573778b2210Santirez     }
574778b2210Santirez 
575e2641e09Santirez     /* SYNC can't be issued when the server has pending data to send to
576e2641e09Santirez      * the client about already issued commands. We need a fresh reply
577e2641e09Santirez      * buffer registering the differences between the BGSAVE and the current
578e2641e09Santirez      * dataset, so that we can copy to other slaves if needed. */
5792d21af45Santirez     if (clientHasPendingReplies(c)) {
580d2a0348aSantirez         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
581e2641e09Santirez         return;
582e2641e09Santirez     }
583e2641e09Santirez 
58432f80e2fSantirez     serverLog(LL_NOTICE,"Slave %s asks for synchronization",
5854b8f4b90Santirez         replicationGetSlaveName(c));
58607888202Santirez 
58707888202Santirez     /* Try a partial resynchronization if this is a PSYNC command.
58807888202Santirez      * If it fails, we continue with usual full resynchronization, however
58907888202Santirez      * when this happens masterTryPartialResynchronization() already
59007888202Santirez      * replied with:
59107888202Santirez      *
59207888202Santirez      * +FULLRESYNC <runid> <offset>
59307888202Santirez      *
59407888202Santirez      * So the slave knows the new runid and offset to try a PSYNC later
59507888202Santirez      * if the connection with the master is lost. */
59624f25836Santirez     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
59740eb548aSantirez         if (masterTryPartialResynchronization(c) == C_OK) {
59824f25836Santirez             server.stat_sync_partial_ok++;
59924f25836Santirez             return; /* No full resync needed, return. */
60024f25836Santirez         } else {
60124f25836Santirez             char *master_runid = c->argv[1]->ptr;
60224f25836Santirez 
60324f25836Santirez             /* Increment stats for failed PSYNCs, but only if the
60424f25836Santirez              * runid is not "?", as this is used by slaves to force a full
60524f25836Santirez              * resync on purpose when they are not albe to partially
60624f25836Santirez              * resync. */
60724f25836Santirez             if (master_runid[0] != '?') server.stat_sync_partial_err++;
60824f25836Santirez         }
6098ca265cdSantirez     } else {
6108ca265cdSantirez         /* If a slave uses SYNC, we are dealing with an old implementation
6118ca265cdSantirez          * of the replication protocol (like redis-cli --slave). Flag the client
6128ca265cdSantirez          * so that we don't expect to receive REPLCONF ACK feedbacks. */
61332f80e2fSantirez         c->flags |= CLIENT_PRE_PSYNC;
61424f25836Santirez     }
61524f25836Santirez 
61624f25836Santirez     /* Full resynchronization. */
61724f25836Santirez     server.stat_sync_full++;
61807888202Santirez 
619f18e5b63Santirez     /* Setup the slave as one waiting for BGSAVE to start. The following code
620f18e5b63Santirez      * paths will change the state if we handle the slave differently. */
621f18e5b63Santirez     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
622f18e5b63Santirez     if (server.repl_disable_tcp_nodelay)
623f18e5b63Santirez         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
624f18e5b63Santirez     c->repldbfd = -1;
625f18e5b63Santirez     c->flags |= CLIENT_SLAVE;
626f18e5b63Santirez     listAddNodeTail(server.slaves,c);
62762b5c60eSantirez 
6283e6d4d59Santirez     /* CASE 1: BGSAVE is in progress, with disk target. */
62975f0cd65Santirez     if (server.rdb_child_pid != -1 &&
63032f80e2fSantirez         server.rdb_child_type == RDB_CHILD_TYPE_DISK)
63175f0cd65Santirez     {
632e2641e09Santirez         /* Ok a background save is in progress. Let's check if it is a good
633e2641e09Santirez          * one for replication, i.e. if there is another slave that is
63416546f5aSantirez          * registering differences since the server forked to save. */
635554bd0e7Santirez         client *slave;
636e2641e09Santirez         listNode *ln;
637e2641e09Santirez         listIter li;
638e2641e09Santirez 
639e2641e09Santirez         listRewind(server.slaves,&li);
640e2641e09Santirez         while((ln = listNext(&li))) {
641e2641e09Santirez             slave = ln->value;
64232f80e2fSantirez             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
643e2641e09Santirez         }
6443e6d4d59Santirez         /* To attach this slave, we check that it has at least all the
6453e6d4d59Santirez          * capabilities of the slave that triggered the current BGSAVE. */
6463e6d4d59Santirez         if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
647e2641e09Santirez             /* Perfect, the server is already registering differences for
64875f0cd65Santirez              * another slave. Set the right state, and copy the buffer. */
6491824e3a3Santirez             copyClientOutputBuffer(c,slave);
65015de6b10Santirez             replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
65132f80e2fSantirez             serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
652e2641e09Santirez         } else {
653e2641e09Santirez             /* No way, we need to wait for the next BGSAVE in order to
65416546f5aSantirez              * register differences. */
655017378ecSantirez             serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
656e2641e09Santirez         }
65762b5c60eSantirez 
6583e6d4d59Santirez     /* CASE 2: BGSAVE is in progress, with socket target. */
65975f0cd65Santirez     } else if (server.rdb_child_pid != -1 &&
66032f80e2fSantirez                server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
66175f0cd65Santirez     {
66275f0cd65Santirez         /* There is an RDB child process but it is writing directly to
66375f0cd65Santirez          * children sockets. We need to wait for the next BGSAVE
66475f0cd65Santirez          * in order to synchronize. */
665017378ecSantirez         serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
66662b5c60eSantirez 
66762b5c60eSantirez     /* CASE 3: There is no BGSAVE is progress. */
66875f0cd65Santirez     } else {
6693e6d4d59Santirez         if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
67075f0cd65Santirez             /* Diskless replication RDB child is created inside
67175f0cd65Santirez              * replicationCron() since we want to delay its start a
67275f0cd65Santirez              * few seconds to wait for more slaves to arrive. */
673a27befc4Santirez             if (server.repl_diskless_sync_delay)
674a1bfe22aSantirez                 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
675e2641e09Santirez         } else {
6763e6d4d59Santirez             /* Target is disk (or the slave is not capable of supporting
6773e6d4d59Santirez              * diskless replication) and we don't have a BGSAVE in progress,
67862b5c60eSantirez              * let's start one. */
679*e67ad1d1SQu Chen             if (server.aof_child_pid == -1) {
680a1bfe22aSantirez                 startBgsaveForReplication(c->slave_capa);
681a1bfe22aSantirez             } else {
682a1bfe22aSantirez                 serverLog(LL_NOTICE,
683a1bfe22aSantirez                     "No BGSAVE in progress, but an AOF rewrite is active. "
684a1bfe22aSantirez                     "BGSAVE for replication delayed");
685a1bfe22aSantirez             }
68675f0cd65Santirez         }
687e2641e09Santirez     }
688b70b459bSantirez 
68907888202Santirez     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
69007888202Santirez         createReplicationBacklog();
691e2641e09Santirez     return;
692e2641e09Santirez }
693e2641e09Santirez 
6943a328978Santirez /* REPLCONF <option> <value> <option> <value> ...
6953a328978Santirez  * This command is used by a slave in order to configure the replication
6963a328978Santirez  * process before starting it with the SYNC command.
6973a328978Santirez  *
6983a328978Santirez  * Currently the only use of this command is to communicate to the master
6993a328978Santirez  * what is the listening port of the Slave redis instance, so that the
7003a328978Santirez  * master can accurately list slaves and their listening ports in
7013a328978Santirez  * the INFO output.
7023a328978Santirez  *
7033a328978Santirez  * In the future the same command can be used in order to configure
7043a328978Santirez  * the replication to initiate an incremental replication instead of a
7053a328978Santirez  * full resync. */
replconfCommand(client * c)706554bd0e7Santirez void replconfCommand(client *c) {
7073a328978Santirez     int j;
7083a328978Santirez 
7093a328978Santirez     if ((c->argc % 2) == 0) {
7103a328978Santirez         /* Number of arguments must be odd to make sure that every
7113a328978Santirez          * option has a corresponding value. */
7123a328978Santirez         addReply(c,shared.syntaxerr);
7133a328978Santirez         return;
7143a328978Santirez     }
7153a328978Santirez 
7163a328978Santirez     /* Process every option-value pair. */
7173a328978Santirez     for (j = 1; j < c->argc; j+=2) {
7183a328978Santirez         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
7193a328978Santirez             long port;
7203a328978Santirez 
7213a328978Santirez             if ((getLongFromObjectOrReply(c,c->argv[j+1],
72240eb548aSantirez                     &port,NULL) != C_OK))
7233a328978Santirez                 return;
7243a328978Santirez             c->slave_listening_port = port;
7250a45fbc3Santirez         } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
7260a45fbc3Santirez             sds ip = c->argv[j+1]->ptr;
7270a45fbc3Santirez             if (sdslen(ip) < sizeof(c->slave_ip)) {
7280a45fbc3Santirez                 memcpy(c->slave_ip,ip,sdslen(ip)+1);
7290a45fbc3Santirez             } else {
7300a45fbc3Santirez                 addReplyErrorFormat(c,"REPLCONF ip-address provided by "
7310a45fbc3Santirez                     "slave instance is too long: %zd bytes", sdslen(ip));
7320a45fbc3Santirez                 return;
7330a45fbc3Santirez             }
7343e6d4d59Santirez         } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
7353e6d4d59Santirez             /* Ignore capabilities not understood by this master. */
7363e6d4d59Santirez             if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
7373e6d4d59Santirez                 c->slave_capa |= SLAVE_CAPA_EOF;
7386b4635f4Santirez         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
7396b4635f4Santirez             /* REPLCONF ACK is used by slave to inform the master the amount
7406b4635f4Santirez              * of replication stream that it processed so far. It is an
7416b4635f4Santirez              * internal only command that normal clients should never use. */
7426b4635f4Santirez             long long offset;
7436b4635f4Santirez 
74432f80e2fSantirez             if (!(c->flags & CLIENT_SLAVE)) return;
74540eb548aSantirez             if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
7466b4635f4Santirez                 return;
7476b4635f4Santirez             if (offset > c->repl_ack_off)
7486b4635f4Santirez                 c->repl_ack_off = offset;
7496b4635f4Santirez             c->repl_ack_time = server.unixtime;
750bb7fea0dSantirez             /* If this was a diskless replication, we need to really put
751bb7fea0dSantirez              * the slave online when the first ACK is received (which
752bb7fea0dSantirez              * confirms slave is online and ready to get more data). */
75332f80e2fSantirez             if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
754bb7fea0dSantirez                 putSlaveOnline(c);
7556b4635f4Santirez             /* Note: this command does not reply anything! */
756dd0adbb7Santirez             return;
757c5618e7fSantirez         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
758c5618e7fSantirez             /* REPLCONF GETACK is used in order to request an ACK ASAP
759c5618e7fSantirez              * to the slave. */
760c5618e7fSantirez             if (server.masterhost && server.master) replicationSendAck();
761c5618e7fSantirez             /* Note: this command does not reply anything! */
7623a328978Santirez         } else {
7633a328978Santirez             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
7643a328978Santirez                 (char*)c->argv[j]->ptr);
7653a328978Santirez             return;
7663a328978Santirez         }
7673a328978Santirez     }
7683a328978Santirez     addReply(c,shared.ok);
7693a328978Santirez }
7703a328978Santirez 
7713730d118Santirez /* This function puts a slave in the online state, and should be called just
7723730d118Santirez  * after a slave received the RDB file for the initial synchronization, and
7733730d118Santirez  * we are finally ready to send the incremental stream of commands.
7743730d118Santirez  *
7753730d118Santirez  * It does a few things:
7763730d118Santirez  *
7776c60526dSantirez  * 1) Put the slave in ONLINE state (useless when the function is called
7786c60526dSantirez  *    because state is already ONLINE but repl_put_online_on_ack is true).
7793730d118Santirez  * 2) Make sure the writable event is re-installed, since calling the SYNC
7803730d118Santirez  *    command disables it, so that we can accumulate output buffer without
7813730d118Santirez  *    sending it to the slave.
7823730d118Santirez  * 3) Update the count of good slaves. */
putSlaveOnline(client * slave)783554bd0e7Santirez void putSlaveOnline(client *slave) {
78432f80e2fSantirez     slave->replstate = SLAVE_STATE_ONLINE;
785bb7fea0dSantirez     slave->repl_put_online_on_ack = 0;
7866c60526dSantirez     slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
7873730d118Santirez     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
7883730d118Santirez         sendReplyToClient, slave) == AE_ERR) {
78932f80e2fSantirez         serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
7903730d118Santirez         freeClient(slave);
7913730d118Santirez         return;
7923730d118Santirez     }
7933730d118Santirez     refreshGoodSlavesCount();
79432f80e2fSantirez     serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
795bb7fea0dSantirez         replicationGetSlaveName(slave));
7963730d118Santirez }
7973730d118Santirez 
sendBulkToSlave(aeEventLoop * el,int fd,void * privdata,int mask)798e2641e09Santirez void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
799554bd0e7Santirez     client *slave = privdata;
80032f80e2fSantirez     UNUSED(el);
80132f80e2fSantirez     UNUSED(mask);
80232f80e2fSantirez     char buf[PROTO_IOBUF_LEN];
803e2641e09Santirez     ssize_t nwritten, buflen;
804e2641e09Santirez 
80589ffba91Santirez     /* Before sending the RDB file, we send the preamble as configured by the
80689ffba91Santirez      * replication process. Currently the preamble is just the bulk count of
80789ffba91Santirez      * the file in the form "$<length>\r\n". */
80889ffba91Santirez     if (slave->replpreamble) {
80989ffba91Santirez         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
81089ffba91Santirez         if (nwritten == -1) {
81132f80e2fSantirez             serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
81289ffba91Santirez                 strerror(errno));
813e2641e09Santirez             freeClient(slave);
814e2641e09Santirez             return;
815e2641e09Santirez         }
8161b732c09Santirez         server.stat_net_output_bytes += nwritten;
81789ffba91Santirez         sdsrange(slave->replpreamble,nwritten,-1);
81889ffba91Santirez         if (sdslen(slave->replpreamble) == 0) {
81989ffba91Santirez             sdsfree(slave->replpreamble);
82089ffba91Santirez             slave->replpreamble = NULL;
82189ffba91Santirez             /* fall through sending data. */
82289ffba91Santirez         } else {
82389ffba91Santirez             return;
824e2641e09Santirez         }
82589ffba91Santirez     }
82689ffba91Santirez 
82789ffba91Santirez     /* If the preamble was already transfered, send the RDB bulk data. */
828e2641e09Santirez     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
82932f80e2fSantirez     buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
830e2641e09Santirez     if (buflen <= 0) {
83132f80e2fSantirez         serverLog(LL_WARNING,"Read error sending DB to slave: %s",
832e2641e09Santirez             (buflen == 0) ? "premature EOF" : strerror(errno));
833e2641e09Santirez         freeClient(slave);
834e2641e09Santirez         return;
835e2641e09Santirez     }
836e2641e09Santirez     if ((nwritten = write(fd,buf,buflen)) == -1) {
837970de3e9Santirez         if (errno != EAGAIN) {
83832f80e2fSantirez             serverLog(LL_WARNING,"Write error sending DB to slave: %s",
839e2641e09Santirez                 strerror(errno));
840e2641e09Santirez             freeClient(slave);
841970de3e9Santirez         }
842e2641e09Santirez         return;
843e2641e09Santirez     }
844e2641e09Santirez     slave->repldboff += nwritten;
8451b732c09Santirez     server.stat_net_output_bytes += nwritten;
846e2641e09Santirez     if (slave->repldboff == slave->repldbsize) {
847e2641e09Santirez         close(slave->repldbfd);
848e2641e09Santirez         slave->repldbfd = -1;
849e2641e09Santirez         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
8503730d118Santirez         putSlaveOnline(slave);
851e2641e09Santirez     }
852e2641e09Santirez }
853e2641e09Santirez 
85442951ab3Santirez /* This function is called at the end of every background saving,
85542951ab3Santirez  * or when the replication RDB transfer strategy is modified from
85642951ab3Santirez  * disk to socket or the other way around.
857e2641e09Santirez  *
858e2641e09Santirez  * The goal of this function is to handle slaves waiting for a successful
85975f0cd65Santirez  * background saving in order to perform non-blocking synchronization, and
86075f0cd65Santirez  * to schedule a new BGSAVE if there are slaves that attached while a
86175f0cd65Santirez  * BGSAVE was in progress, but it was not a good one for replication (no
86242951ab3Santirez  * other slave was accumulating differences).
86342951ab3Santirez  *
86440eb548aSantirez  * The argument bgsaveerr is C_OK if the background saving succeeded
86540eb548aSantirez  * otherwise C_ERR is passed to the function.
86642951ab3Santirez  * The 'type' argument is the type of the child that terminated
86742951ab3Santirez  * (if it had a disk or socket target). */
updateSlavesWaitingBgsave(int bgsaveerr,int type)86875f0cd65Santirez void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
869e2641e09Santirez     listNode *ln;
870e2641e09Santirez     int startbgsave = 0;
8713e6d4d59Santirez     int mincapa = -1;
872e2641e09Santirez     listIter li;
873e2641e09Santirez 
874e2641e09Santirez     listRewind(server.slaves,&li);
875e2641e09Santirez     while((ln = listNext(&li))) {
876554bd0e7Santirez         client *slave = ln->value;
877e2641e09Santirez 
87832f80e2fSantirez         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
879e2641e09Santirez             startbgsave = 1;
8803e6d4d59Santirez             mincapa = (mincapa == -1) ? slave->slave_capa :
8813e6d4d59Santirez                                         (mincapa & slave->slave_capa);
88232f80e2fSantirez         } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
883e2641e09Santirez             struct redis_stat buf;
884e2641e09Santirez 
8853730d118Santirez             /* If this was an RDB on disk save, we have to prepare to send
8863730d118Santirez              * the RDB from disk to the slave socket. Otherwise if this was
8873730d118Santirez              * already an RDB -> Slaves socket transfer, used in the case of
8883730d118Santirez              * diskless replication, our work is trivial, we can just put
8893730d118Santirez              * the slave online. */
89032f80e2fSantirez             if (type == RDB_CHILD_TYPE_SOCKET) {
89132f80e2fSantirez                 serverLog(LL_NOTICE,
892bb7fea0dSantirez                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
8934b8f4b90Santirez                         replicationGetSlaveName(slave));
894bb7fea0dSantirez                 /* Note: we wait for a REPLCONF ACK message from slave in
895bb7fea0dSantirez                  * order to really put it online (install the write handler
896bb7fea0dSantirez                  * so that the accumulated data can be transfered). However
897bb7fea0dSantirez                  * we change the replication state ASAP, since our slave
898bb7fea0dSantirez                  * is technically online now. */
89932f80e2fSantirez                 slave->replstate = SLAVE_STATE_ONLINE;
900bb7fea0dSantirez                 slave->repl_put_online_on_ack = 1;
9016c60526dSantirez                 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
9023730d118Santirez             } else {
90340eb548aSantirez                 if (bgsaveerr != C_OK) {
904e2641e09Santirez                     freeClient(slave);
90532f80e2fSantirez                     serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
906e2641e09Santirez                     continue;
907e2641e09Santirez                 }
908f48cd4b9Santirez                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
909e2641e09Santirez                     redis_fstat(slave->repldbfd,&buf) == -1) {
910e2641e09Santirez                     freeClient(slave);
91132f80e2fSantirez                     serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
912e2641e09Santirez                     continue;
913e2641e09Santirez                 }
914e2641e09Santirez                 slave->repldboff = 0;
915e2641e09Santirez                 slave->repldbsize = buf.st_size;
91632f80e2fSantirez                 slave->replstate = SLAVE_STATE_SEND_BULK;
91789ffba91Santirez                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
91889ffba91Santirez                     (unsigned long long) slave->repldbsize);
91989ffba91Santirez 
920e2641e09Santirez                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
921e2641e09Santirez                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
922e2641e09Santirez                     freeClient(slave);
923e2641e09Santirez                     continue;
924e2641e09Santirez                 }
925e2641e09Santirez             }
926e2641e09Santirez         }
9273730d118Santirez     }
928f18e5b63Santirez     if (startbgsave) startBgsaveForReplication(mincapa);
929e2641e09Santirez }
930e2641e09Santirez 
931f4aa600bSantirez /* ----------------------------------- SLAVE -------------------------------- */
932f4aa600bSantirez 
933e3344b80Santirez /* Returns 1 if the given replication state is a handshake state,
934e3344b80Santirez  * 0 otherwise. */
slaveIsInHandshakeState(void)935e3344b80Santirez int slaveIsInHandshakeState(void) {
936e3344b80Santirez     return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
937e3344b80Santirez            server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
938e3344b80Santirez }
939e3344b80Santirez 
94011120689Santirez /* Avoid the master to detect the slave is timing out while loading the
94111120689Santirez  * RDB file in initial synchronization. We send a single newline character
94211120689Santirez  * that is valid protocol but is guaranteed to either be sent entierly or
94311120689Santirez  * not, since the byte is indivisible.
94411120689Santirez  *
94511120689Santirez  * The function is called in two contexts: while we flush the current
94611120689Santirez  * data with emptyDb(), and while we load the new data received as an
94711120689Santirez  * RDB file from the master. */
replicationSendNewlineToMaster(void)94811120689Santirez void replicationSendNewlineToMaster(void) {
94911120689Santirez     static time_t newline_sent;
95011120689Santirez     if (time(NULL) != newline_sent) {
95111120689Santirez         newline_sent = time(NULL);
95211120689Santirez         if (write(server.repl_transfer_s,"\n",1) == -1) {
95311120689Santirez             /* Pinging back in this stage is best-effort. */
95411120689Santirez         }
95511120689Santirez     }
95611120689Santirez }
95711120689Santirez 
95811120689Santirez /* Callback used by emptyDb() while flushing away old data to load
95911120689Santirez  * the new dataset received by the master. */
replicationEmptyDbCallback(void * privdata)96011120689Santirez void replicationEmptyDbCallback(void *privdata) {
96132f80e2fSantirez     UNUSED(privdata);
96211120689Santirez     replicationSendNewlineToMaster();
96311120689Santirez }
96411120689Santirez 
965c5dd686eSantirez /* Once we have a link with the master and the synchroniziation was
966c5dd686eSantirez  * performed, this function materializes the master client we store
967c5dd686eSantirez  * at server.master, starting from the specified file descriptor. */
replicationCreateMasterClient(int fd)968c5dd686eSantirez void replicationCreateMasterClient(int fd) {
969c5dd686eSantirez     server.master = createClient(fd);
97032f80e2fSantirez     server.master->flags |= CLIENT_MASTER;
971c5dd686eSantirez     server.master->authenticated = 1;
97232f80e2fSantirez     server.repl_state = REPL_STATE_CONNECTED;
973c5dd686eSantirez     server.master->reploff = server.repl_master_initial_offset;
974c5dd686eSantirez     memcpy(server.master->replrunid, server.repl_master_runid,
975c5dd686eSantirez         sizeof(server.repl_master_runid));
976c5dd686eSantirez     /* If master offset is set to -1, this master is old and is not
977c5dd686eSantirez      * PSYNC capable, so we flag it accordingly. */
978c5dd686eSantirez     if (server.master->reploff == -1)
97932f80e2fSantirez         server.master->flags |= CLIENT_PRE_PSYNC;
980c5dd686eSantirez }
981c5dd686eSantirez 
982f4aa600bSantirez /* Asynchronously read the SYNC payload we receive from a master */
983784b9308Santirez #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
readSyncBulkPayload(aeEventLoop * el,int fd,void * privdata,int mask)984f4aa600bSantirez void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
98526b33669Santirez     char buf[4096];
98662ec599cSantirez     ssize_t nread, readlen;
987784b9308Santirez     off_t left;
98832f80e2fSantirez     UNUSED(el);
98932f80e2fSantirez     UNUSED(privdata);
99032f80e2fSantirez     UNUSED(mask);
991f4aa600bSantirez 
9925ee2ccf4Santirez     /* Static vars used to hold the EOF mark, and the last bytes received
9935ee2ccf4Santirez      * form the server: when they match, we reached the end of the transfer. */
99432f80e2fSantirez     static char eofmark[CONFIG_RUN_ID_SIZE];
99532f80e2fSantirez     static char lastbytes[CONFIG_RUN_ID_SIZE];
9965ee2ccf4Santirez     static int usemark = 0;
9975ee2ccf4Santirez 
998784b9308Santirez     /* If repl_transfer_size == -1 we still have to read the bulk length
99926b33669Santirez      * from the master reply. */
1000784b9308Santirez     if (server.repl_transfer_size == -1) {
10019157549fSantirez         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
100232f80e2fSantirez             serverLog(LL_WARNING,
100326b33669Santirez                 "I/O error reading bulk count from MASTER: %s",
100426b33669Santirez                 strerror(errno));
1005b075621fSPieter Noordhuis             goto error;
100626b33669Santirez         }
1007b075621fSPieter Noordhuis 
100826b33669Santirez         if (buf[0] == '-') {
100932f80e2fSantirez             serverLog(LL_WARNING,
101026b33669Santirez                 "MASTER aborted replication with an error: %s",
101126b33669Santirez                 buf+1);
1012b075621fSPieter Noordhuis             goto error;
101389a1433eSantirez         } else if (buf[0] == '\0') {
101489a1433eSantirez             /* At this stage just a newline works as a PING in order to take
101589a1433eSantirez              * the connection live. So we refresh our last interaction
101689a1433eSantirez              * timestamp. */
1017d1949054SPremysl Hruby             server.repl_transfer_lastio = server.unixtime;
101889a1433eSantirez             return;
101926b33669Santirez         } else if (buf[0] != '$') {
102032f80e2fSantirez             serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1021b075621fSPieter Noordhuis             goto error;
102226b33669Santirez         }
10235ee2ccf4Santirez 
10245ee2ccf4Santirez         /* There are two possible forms for the bulk payload. One is the
10255ee2ccf4Santirez          * usual $<count> bulk format. The other is used for diskless transfers
10265ee2ccf4Santirez          * when the master does not know beforehand the size of the file to
10275ee2ccf4Santirez          * transfer. In the latter case, the following format is used:
10285ee2ccf4Santirez          *
10295ee2ccf4Santirez          * $EOF:<40 bytes delimiter>
10305ee2ccf4Santirez          *
10315ee2ccf4Santirez          * At the end of the file the announced delimiter is transmitted. The
10325ee2ccf4Santirez          * delimiter is long and random enough that the probability of a
10335ee2ccf4Santirez          * collision with the actual file content can be ignored. */
103432f80e2fSantirez         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
10355ee2ccf4Santirez             usemark = 1;
103632f80e2fSantirez             memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
103732f80e2fSantirez             memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
103880f7f63bSantirez             /* Set any repl_transfer_size to avoid entering this code path
103980f7f63bSantirez              * at the next call. */
104080f7f63bSantirez             server.repl_transfer_size = 0;
104132f80e2fSantirez             serverLog(LL_NOTICE,
10425ee2ccf4Santirez                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
10435ee2ccf4Santirez         } else {
10445ee2ccf4Santirez             usemark = 0;
1045784b9308Santirez             server.repl_transfer_size = strtol(buf+1,NULL,10);
104632f80e2fSantirez             serverLog(LL_NOTICE,
1047f9b5ca29Santirez                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
1048f9b5ca29Santirez                 (long long) server.repl_transfer_size);
10495ee2ccf4Santirez         }
105026b33669Santirez         return;
105126b33669Santirez     }
105226b33669Santirez 
105326b33669Santirez     /* Read bulk data */
10545ee2ccf4Santirez     if (usemark) {
10550c5a06f6Santirez         readlen = sizeof(buf);
10560c5a06f6Santirez     } else {
1057784b9308Santirez         left = server.repl_transfer_size - server.repl_transfer_read;
1058784b9308Santirez         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
10595ee2ccf4Santirez     }
10605ee2ccf4Santirez 
1061f4aa600bSantirez     nread = read(fd,buf,readlen);
1062f4aa600bSantirez     if (nread <= 0) {
106332f80e2fSantirez         serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1064f4aa600bSantirez             (nread == -1) ? strerror(errno) : "connection lost");
1065c5f8c80aSantirez         cancelReplicationHandshake();
1066f4aa600bSantirez         return;
1067f4aa600bSantirez     }
10681b732c09Santirez     server.stat_net_input_bytes += nread;
10695ee2ccf4Santirez 
10705ee2ccf4Santirez     /* When a mark is used, we want to detect EOF asap in order to avoid
10715ee2ccf4Santirez      * writing the EOF mark into the file... */
10725ee2ccf4Santirez     int eof_reached = 0;
10735ee2ccf4Santirez 
10745ee2ccf4Santirez     if (usemark) {
10755ee2ccf4Santirez         /* Update the last bytes array, and check if it matches our delimiter.*/
107632f80e2fSantirez         if (nread >= CONFIG_RUN_ID_SIZE) {
107732f80e2fSantirez             memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
10785ee2ccf4Santirez         } else {
107932f80e2fSantirez             int rem = CONFIG_RUN_ID_SIZE-nread;
10805ee2ccf4Santirez             memmove(lastbytes,lastbytes+nread,rem);
10815ee2ccf4Santirez             memcpy(lastbytes+rem,buf,nread);
10825ee2ccf4Santirez         }
108332f80e2fSantirez         if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
10845ee2ccf4Santirez     }
10855ee2ccf4Santirez 
1086d1949054SPremysl Hruby     server.repl_transfer_lastio = server.unixtime;
1087f4aa600bSantirez     if (write(server.repl_transfer_fd,buf,nread) != nread) {
108832f80e2fSantirez         serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
1089b075621fSPieter Noordhuis         goto error;
1090f4aa600bSantirez     }
1091784b9308Santirez     server.repl_transfer_read += nread;
1092784b9308Santirez 
109325a3d996Santirez     /* Delete the last 40 bytes from the file if we reached EOF. */
109425a3d996Santirez     if (usemark && eof_reached) {
109525a3d996Santirez         if (ftruncate(server.repl_transfer_fd,
109632f80e2fSantirez             server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
109725a3d996Santirez         {
109832f80e2fSantirez             serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
109925a3d996Santirez             goto error;
110025a3d996Santirez         }
110125a3d996Santirez     }
110225a3d996Santirez 
1103784b9308Santirez     /* Sync data on disk from time to time, otherwise at the end of the transfer
1104784b9308Santirez      * we may suffer a big delay as the memory buffers are copied into the
1105784b9308Santirez      * actual disk. */
1106784b9308Santirez     if (server.repl_transfer_read >=
1107784b9308Santirez         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1108784b9308Santirez     {
1109784b9308Santirez         off_t sync_size = server.repl_transfer_read -
1110784b9308Santirez                           server.repl_transfer_last_fsync_off;
1111784b9308Santirez         rdb_fsync_range(server.repl_transfer_fd,
1112784b9308Santirez             server.repl_transfer_last_fsync_off, sync_size);
1113784b9308Santirez         server.repl_transfer_last_fsync_off += sync_size;
1114784b9308Santirez     }
1115784b9308Santirez 
1116f4aa600bSantirez     /* Check if the transfer is now complete */
11175ee2ccf4Santirez     if (!usemark) {
11185ee2ccf4Santirez         if (server.repl_transfer_read == server.repl_transfer_size)
11195ee2ccf4Santirez             eof_reached = 1;
11205ee2ccf4Santirez     }
11215ee2ccf4Santirez 
11225ee2ccf4Santirez     if (eof_reached) {
1123f48cd4b9Santirez         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
112432f80e2fSantirez             serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1125c5f8c80aSantirez             cancelReplicationHandshake();
1126f4aa600bSantirez             return;
1127f4aa600bSantirez         }
112832f80e2fSantirez         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1129a6c2f901Santirez         signalFlushedDb(-1);
113011120689Santirez         emptyDb(replicationEmptyDbCallback);
11319fd01051Santirez         /* Before loading the DB into memory we need to delete the readable
11329fd01051Santirez          * handler, otherwise it will get called recursively since
11339fd01051Santirez          * rdbLoad() will call the event loop to process events from time to
11349fd01051Santirez          * time for non blocking loading. */
11359fd01051Santirez         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
113632f80e2fSantirez         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
113740eb548aSantirez         if (rdbLoad(server.rdb_filename) != C_OK) {
113832f80e2fSantirez             serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1139c5f8c80aSantirez             cancelReplicationHandshake();
1140f4aa600bSantirez             return;
1141f4aa600bSantirez         }
1142f4aa600bSantirez         /* Final setup of the connected slave <- master link */
1143f4aa600bSantirez         zfree(server.repl_transfer_tmpfile);
1144f4aa600bSantirez         close(server.repl_transfer_fd);
1145c5dd686eSantirez         replicationCreateMasterClient(server.repl_transfer_s);
114632f80e2fSantirez         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1147e7a2e7c1Santirez         /* Restart the AOF subsystem now that we finished the sync. This
1148e7a2e7c1Santirez          * will trigger an AOF rewrite, and when done will start appending
1149e7a2e7c1Santirez          * to the new file. */
115032f80e2fSantirez         if (server.aof_state != AOF_OFF) {
1151e7a2e7c1Santirez             int retry = 10;
1152e7a2e7c1Santirez 
1153e7a2e7c1Santirez             stopAppendOnly();
115440eb548aSantirez             while (retry-- && startAppendOnly() == C_ERR) {
115532f80e2fSantirez                 serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1156e7a2e7c1Santirez                 sleep(1);
1157e7a2e7c1Santirez             }
1158e7a2e7c1Santirez             if (!retry) {
115932f80e2fSantirez                 serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1160e7a2e7c1Santirez                 exit(1);
1161e7a2e7c1Santirez             }
1162e7a2e7c1Santirez         }
1163f4aa600bSantirez     }
1164b075621fSPieter Noordhuis 
1165b075621fSPieter Noordhuis     return;
1166b075621fSPieter Noordhuis 
1167b075621fSPieter Noordhuis error:
1168c5f8c80aSantirez     cancelReplicationHandshake();
1169b075621fSPieter Noordhuis     return;
1170f4aa600bSantirez }
1171f4aa600bSantirez 
11723a328978Santirez /* Send a synchronous command to the master. Used to send AUTH and
117336def8fdSantirez  * REPLCONF commands before starting the replication with SYNC.
11743a328978Santirez  *
117507888202Santirez  * The command returns an sds string representing the result of the
117607888202Santirez  * operation. On error the first byte is a "-".
11773a328978Santirez  */
117888c716a0Santirez #define SYNC_CMD_READ (1<<0)
117988c716a0Santirez #define SYNC_CMD_WRITE (1<<1)
118088c716a0Santirez #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
sendSynchronousCommand(int flags,int fd,...)118188c716a0Santirez char *sendSynchronousCommand(int flags, int fd, ...) {
11823a328978Santirez 
11833a328978Santirez     /* Create the command to send to the master, we use simple inline
11843a328978Santirez      * protocol for simplicity as currently we only send simple strings. */
118588c716a0Santirez     if (flags & SYNC_CMD_WRITE) {
118688c716a0Santirez         char *arg;
118788c716a0Santirez         va_list ap;
118888c716a0Santirez         sds cmd = sdsempty();
11893a328978Santirez         va_start(ap,fd);
119088c716a0Santirez 
11913a328978Santirez         while(1) {
11923a328978Santirez             arg = va_arg(ap, char*);
11933a328978Santirez             if (arg == NULL) break;
11943a328978Santirez 
11953a328978Santirez             if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
11963a328978Santirez             cmd = sdscat(cmd,arg);
11973a328978Santirez         }
11983a328978Santirez         cmd = sdscatlen(cmd,"\r\n",2);
11993a328978Santirez 
12003a328978Santirez         /* Transfer command to the server. */
120188c716a0Santirez         if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
120288c716a0Santirez             == -1)
120388c716a0Santirez         {
12043a328978Santirez             sdsfree(cmd);
120507888202Santirez             return sdscatprintf(sdsempty(),"-Writing to master: %s",
12063a328978Santirez                     strerror(errno));
12073a328978Santirez         }
12083a328978Santirez         sdsfree(cmd);
120988c716a0Santirez         va_end(ap);
121088c716a0Santirez     }
12113a328978Santirez 
12123a328978Santirez     /* Read the reply from the server. */
121388c716a0Santirez     if (flags & SYNC_CMD_READ) {
121488c716a0Santirez         char buf[256];
121588c716a0Santirez 
121688c716a0Santirez         if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000)
121788c716a0Santirez             == -1)
12183a328978Santirez         {
121907888202Santirez             return sdscatprintf(sdsempty(),"-Reading from master: %s",
12203a328978Santirez                     strerror(errno));
12213a328978Santirez         }
1222e3344b80Santirez         server.repl_transfer_lastio = server.unixtime;
122307888202Santirez         return sdsnew(buf);
12243a328978Santirez     }
122588c716a0Santirez     return NULL;
122688c716a0Santirez }
12273a328978Santirez 
122807888202Santirez /* Try a partial resynchronization with the master if we are about to reconnect.
122907888202Santirez  * If there is no cached master structure, at least try to issue a
123007888202Santirez  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
123107888202Santirez  * command in order to obtain the master run id and the master replication
123207888202Santirez  * global offset.
123307888202Santirez  *
123407888202Santirez  * This function is designed to be called from syncWithMaster(), so the
123507888202Santirez  * following assumptions are made:
123607888202Santirez  *
123707888202Santirez  * 1) We pass the function an already connected socket "fd".
123807888202Santirez  * 2) This function does not close the file descriptor "fd". However in case
123907888202Santirez  *    of successful partial resynchronization, the function will reuse
124007888202Santirez  *    'fd' as file descriptor of the server.master client structure.
124107888202Santirez  *
124288c716a0Santirez  * The function is split in two halves: if read_reply is 0, the function
124388c716a0Santirez  * writes the PSYNC command on the socket, and a new function call is
124488c716a0Santirez  * needed, with read_reply set to 1, in order to read the reply of the
124588c716a0Santirez  * command. This is useful in order to support non blocking operations, so
124688c716a0Santirez  * that we write, return into the event loop, and read when there are data.
124788c716a0Santirez  *
124888c716a0Santirez  * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
124988c716a0Santirez  * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
125088c716a0Santirez  * with read_reply set to 1. However even when read_reply is set to 1
125188c716a0Santirez  * the function may return PSYNC_WAIT_REPLY again to signal there were
125288c716a0Santirez  * insufficient data to read to complete its work. We should re-enter
125388c716a0Santirez  * into the event loop and wait in such a case.
125488c716a0Santirez  *
125507888202Santirez  * The function returns:
125607888202Santirez  *
125707888202Santirez  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
125807888202Santirez  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
125907888202Santirez  *                   In this case the master run_id and global replication
126007888202Santirez  *                   offset is saved.
126107888202Santirez  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
126207888202Santirez  *                      the caller should fall back to SYNC.
126388c716a0Santirez  * PSYNC_WRITE_ERR: There was an error writing the command to the socket.
126488c716a0Santirez  * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1265bea12591Santirez  *
1266bea12591Santirez  * Notable side effects:
1267bea12591Santirez  *
1268bea12591Santirez  * 1) As a side effect of the function call the function removes the readable
1269bea12591Santirez  *    event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1270bea12591Santirez  * 2) server.repl_master_initial_offset is set to the right value according
1271bea12591Santirez  *    to the master reply. This will be used to populate the 'server.master'
1272bea12591Santirez  *    structure replication offset.
127307888202Santirez  */
127407888202Santirez 
127588c716a0Santirez #define PSYNC_WRITE_ERROR 0
127688c716a0Santirez #define PSYNC_WAIT_REPLY 1
127788c716a0Santirez #define PSYNC_CONTINUE 2
127888c716a0Santirez #define PSYNC_FULLRESYNC 3
127988c716a0Santirez #define PSYNC_NOT_SUPPORTED 4
slaveTryPartialResynchronization(int fd,int read_reply)128088c716a0Santirez int slaveTryPartialResynchronization(int fd, int read_reply) {
128107888202Santirez     char *psync_runid;
128207888202Santirez     char psync_offset[32];
128307888202Santirez     sds reply;
128407888202Santirez 
128588c716a0Santirez     /* Writing half */
128688c716a0Santirez     if (!read_reply) {
128707888202Santirez         /* Initially set repl_master_initial_offset to -1 to mark the current
128807888202Santirez          * master run_id and offset as not valid. Later if we'll be able to do
128907888202Santirez          * a FULL resync using the PSYNC command we'll set the offset at the
129007888202Santirez          * right value, so that this information will be propagated to the
129107888202Santirez          * client structure representing the master into server.master. */
129207888202Santirez         server.repl_master_initial_offset = -1;
129307888202Santirez 
129407888202Santirez         if (server.cached_master) {
129507888202Santirez             psync_runid = server.cached_master->replrunid;
129607888202Santirez             snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
129732f80e2fSantirez             serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
129807888202Santirez         } else {
129932f80e2fSantirez             serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
130007888202Santirez             psync_runid = "?";
130107888202Santirez             memcpy(psync_offset,"-1",3);
130207888202Santirez         }
130307888202Santirez 
130407888202Santirez         /* Issue the PSYNC command */
130588c716a0Santirez         reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
130688c716a0Santirez         if (reply != NULL) {
130788c716a0Santirez             serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
130888c716a0Santirez             sdsfree(reply);
1309bea12591Santirez             aeDeleteFileEvent(server.el,fd,AE_READABLE);
131088c716a0Santirez             return PSYNC_WRITE_ERROR;
131188c716a0Santirez         }
131288c716a0Santirez         return PSYNC_WAIT_REPLY;
131388c716a0Santirez     }
131488c716a0Santirez 
131588c716a0Santirez     /* Reading half */
131688c716a0Santirez     reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
131788c716a0Santirez     if (sdslen(reply) == 0) {
131888c716a0Santirez         /* The master may send empty newlines after it receives PSYNC
131988c716a0Santirez          * and before to reply, just to keep the connection alive. */
132088c716a0Santirez         sdsfree(reply);
132188c716a0Santirez         return PSYNC_WAIT_REPLY;
132288c716a0Santirez     }
132307888202Santirez 
1324bea12591Santirez     aeDeleteFileEvent(server.el,fd,AE_READABLE);
1325bea12591Santirez 
132607888202Santirez     if (!strncmp(reply,"+FULLRESYNC",11)) {
132789b48f08Santirez         char *runid = NULL, *offset = NULL;
132807888202Santirez 
132907888202Santirez         /* FULL RESYNC, parse the reply in order to extract the run id
133007888202Santirez          * and the replication offset. */
133107888202Santirez         runid = strchr(reply,' ');
133207888202Santirez         if (runid) {
133307888202Santirez             runid++;
133407888202Santirez             offset = strchr(runid,' ');
133507888202Santirez             if (offset) offset++;
133607888202Santirez         }
133732f80e2fSantirez         if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
133832f80e2fSantirez             serverLog(LL_WARNING,
133907888202Santirez                 "Master replied with wrong +FULLRESYNC syntax.");
13400e1be534Santirez             /* This is an unexpected condition, actually the +FULLRESYNC
13410e1be534Santirez              * reply means that the master supports PSYNC, but the reply
13420e1be534Santirez              * format seems wrong. To stay safe we blank the master
1343072c91feSantirez              * runid to make sure next PSYNCs will fail. */
134432f80e2fSantirez             memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
134507888202Santirez         } else {
134607888202Santirez             memcpy(server.repl_master_runid, runid, offset-runid-1);
134732f80e2fSantirez             server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
134807888202Santirez             server.repl_master_initial_offset = strtoll(offset,NULL,10);
134932f80e2fSantirez             serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
135007888202Santirez                 server.repl_master_runid,
135107888202Santirez                 server.repl_master_initial_offset);
135207888202Santirez         }
135307888202Santirez         /* We are going to full resync, discard the cached master structure. */
135407888202Santirez         replicationDiscardCachedMaster();
135507888202Santirez         sdsfree(reply);
135607888202Santirez         return PSYNC_FULLRESYNC;
135707888202Santirez     }
135807888202Santirez 
135907888202Santirez     if (!strncmp(reply,"+CONTINUE",9)) {
136007888202Santirez         /* Partial resync was accepted, set the replication state accordingly */
136132f80e2fSantirez         serverLog(LL_NOTICE,
136207888202Santirez             "Successful partial resynchronization with master.");
136307888202Santirez         sdsfree(reply);
136407888202Santirez         replicationResurrectCachedMaster(fd);
136507888202Santirez         return PSYNC_CONTINUE;
136607888202Santirez     }
136707888202Santirez 
136888c716a0Santirez     /* If we reach this point we received either an error since the master does
136907888202Santirez      * not understand PSYNC, or an unexpected reply from the master.
13703f92e056Santirez      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
137107888202Santirez 
137207888202Santirez     if (strncmp(reply,"-ERR",4)) {
137307888202Santirez         /* If it's not an error, log the unexpected event. */
137432f80e2fSantirez         serverLog(LL_WARNING,
137507888202Santirez             "Unexpected reply to PSYNC from master: %s", reply);
137607888202Santirez     } else {
137732f80e2fSantirez         serverLog(LL_NOTICE,
137807888202Santirez             "Master does not support PSYNC or is in "
137907888202Santirez             "error state (reply: %s)", reply);
138007888202Santirez     }
138107888202Santirez     sdsfree(reply);
138207888202Santirez     replicationDiscardCachedMaster();
138307888202Santirez     return PSYNC_NOT_SUPPORTED;
13843a328978Santirez }
13853a328978Santirez 
syncWithMaster(aeEventLoop * el,int fd,void * privdata,int mask)1386a3309139SPieter Noordhuis void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
138788c716a0Santirez     char tmpfile[256], *err = NULL;
1388e2641e09Santirez     int dfd, maxtries = 5;
138907888202Santirez     int sockerr = 0, psync_result;
1390bb66fc31Santirez     socklen_t errlen = sizeof(sockerr);
139132f80e2fSantirez     UNUSED(el);
139232f80e2fSantirez     UNUSED(privdata);
139332f80e2fSantirez     UNUSED(mask);
1394e2641e09Santirez 
139576e772f3Santirez     /* If this event fired after the user turned the instance into a master
139676e772f3Santirez      * with SLAVEOF NO ONE we must just return ASAP. */
139732f80e2fSantirez     if (server.repl_state == REPL_STATE_NONE) {
139876e772f3Santirez         close(fd);
139976e772f3Santirez         return;
140076e772f3Santirez     }
140176e772f3Santirez 
1402bb66fc31Santirez     /* Check for errors in the socket. */
1403bb66fc31Santirez     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1404bb66fc31Santirez         sockerr = errno;
1405bb66fc31Santirez     if (sockerr) {
140632f80e2fSantirez         serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
1407bb66fc31Santirez             strerror(sockerr));
1408bb66fc31Santirez         goto error;
1409bb66fc31Santirez     }
1410bb66fc31Santirez 
141188c716a0Santirez     /* Send a PING to check the master is able to reply without errors. */
141232f80e2fSantirez     if (server.repl_state == REPL_STATE_CONNECTING) {
141332f80e2fSantirez         serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
1414bb66fc31Santirez         /* Delete the writable event so that the readable event remains
1415bb66fc31Santirez          * registered and we can wait for the PONG reply. */
1416bb66fc31Santirez         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
141732f80e2fSantirez         server.repl_state = REPL_STATE_RECEIVE_PONG;
1418bb66fc31Santirez         /* Send the PING, don't check for errors at all, we have the timeout
1419bb66fc31Santirez          * that will take care about this. */
142088c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
142188c716a0Santirez         if (err) goto write_error;
1422bb66fc31Santirez         return;
1423bb66fc31Santirez     }
1424bb66fc31Santirez 
1425bb66fc31Santirez     /* Receive the PONG command. */
142632f80e2fSantirez     if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
142788c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1428bb66fc31Santirez 
14293419c8ceSantirez         /* We accept only two replies as valid, a positive +PONG reply
14303419c8ceSantirez          * (we just check for "+") or an authentication error.
14313419c8ceSantirez          * Note that older versions of Redis replied with "operation not
14323419c8ceSantirez          * permitted" instead of using a proper error code, so we test
14333419c8ceSantirez          * both. */
143488c716a0Santirez         if (err[0] != '+' &&
143588c716a0Santirez             strncmp(err,"-NOAUTH",7) != 0 &&
143688c716a0Santirez             strncmp(err,"-ERR operation not permitted",28) != 0)
14373419c8ceSantirez         {
143888c716a0Santirez             serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
143988c716a0Santirez             sdsfree(err);
1440bb66fc31Santirez             goto error;
1441bb66fc31Santirez         } else {
144232f80e2fSantirez             serverLog(LL_NOTICE,
1443bb66fc31Santirez                 "Master replied to PING, replication can continue...");
1444bb66fc31Santirez         }
144588c716a0Santirez         sdsfree(err);
144688c716a0Santirez         server.repl_state = REPL_STATE_SEND_AUTH;
1447bb66fc31Santirez     }
1448e2641e09Santirez 
1449e2641e09Santirez     /* AUTH with the master if required. */
145088c716a0Santirez     if (server.repl_state == REPL_STATE_SEND_AUTH) {
1451e2641e09Santirez         if (server.masterauth) {
145288c716a0Santirez             err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
145388c716a0Santirez             if (err) goto write_error;
145488c716a0Santirez             server.repl_state = REPL_STATE_RECEIVE_AUTH;
145588c716a0Santirez             return;
145688c716a0Santirez         } else {
145788c716a0Santirez             server.repl_state = REPL_STATE_SEND_PORT;
145888c716a0Santirez         }
145988c716a0Santirez     }
146088c716a0Santirez 
146188c716a0Santirez     /* Receive AUTH reply. */
146288c716a0Santirez     if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
146388c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
146407888202Santirez         if (err[0] == '-') {
146532f80e2fSantirez             serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
14663a328978Santirez             sdsfree(err);
14673a328978Santirez             goto error;
14683a328978Santirez         }
146907888202Santirez         sdsfree(err);
147088c716a0Santirez         server.repl_state = REPL_STATE_SEND_PORT;
14713a328978Santirez     }
1472a3309139SPieter Noordhuis 
14733a328978Santirez     /* Set the slave port, so that Master's INFO command can list the
14743a328978Santirez      * slave listening port correctly. */
147588c716a0Santirez     if (server.repl_state == REPL_STATE_SEND_PORT) {
14760a45fbc3Santirez         sds port = sdsfromlonglong(server.slave_announce_port ?
14770a45fbc3Santirez             server.slave_announce_port : server.port);
147888c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
147988c716a0Santirez                 "listening-port",port, NULL);
14803a328978Santirez         sdsfree(port);
148188c716a0Santirez         if (err) goto write_error;
148288c716a0Santirez         sdsfree(err);
148388c716a0Santirez         server.repl_state = REPL_STATE_RECEIVE_PORT;
148488c716a0Santirez         return;
148588c716a0Santirez     }
148688c716a0Santirez 
148788c716a0Santirez     /* Receive REPLCONF listening-port reply. */
148888c716a0Santirez     if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
148988c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
14903a328978Santirez         /* Ignore the error if any, not all the Redis versions support
14913a328978Santirez          * REPLCONF listening-port. */
149207888202Santirez         if (err[0] == '-') {
14933e6d4d59Santirez             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
14943e6d4d59Santirez                                 "REPLCONF listening-port: %s", err);
14953e6d4d59Santirez         }
14963e6d4d59Santirez         sdsfree(err);
14970a45fbc3Santirez         server.repl_state = REPL_STATE_SEND_IP;
14980a45fbc3Santirez     }
14990a45fbc3Santirez 
15000a45fbc3Santirez     /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
15010a45fbc3Santirez     if (server.repl_state == REPL_STATE_SEND_IP &&
15020a45fbc3Santirez         server.slave_announce_ip == NULL)
15030a45fbc3Santirez     {
15040a45fbc3Santirez             server.repl_state = REPL_STATE_SEND_CAPA;
15050a45fbc3Santirez     }
15060a45fbc3Santirez 
15070a45fbc3Santirez     /* Set the slave ip, so that Master's INFO command can list the
15080a45fbc3Santirez      * slave IP address port correctly in case of port forwarding or NAT. */
15090a45fbc3Santirez     if (server.repl_state == REPL_STATE_SEND_IP) {
15100a45fbc3Santirez         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
15110a45fbc3Santirez                 "ip-address",server.slave_announce_ip, NULL);
15120a45fbc3Santirez         if (err) goto write_error;
15130a45fbc3Santirez         sdsfree(err);
15140a45fbc3Santirez         server.repl_state = REPL_STATE_RECEIVE_IP;
15150a45fbc3Santirez         return;
15160a45fbc3Santirez     }
15170a45fbc3Santirez 
15180a45fbc3Santirez     /* Receive REPLCONF ip-address reply. */
15190a45fbc3Santirez     if (server.repl_state == REPL_STATE_RECEIVE_IP) {
15200a45fbc3Santirez         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
15210a45fbc3Santirez         /* Ignore the error if any, not all the Redis versions support
15220a45fbc3Santirez          * REPLCONF listening-port. */
15230a45fbc3Santirez         if (err[0] == '-') {
15240a45fbc3Santirez             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
15250a45fbc3Santirez                                 "REPLCONF ip-address: %s", err);
15260a45fbc3Santirez         }
15270a45fbc3Santirez         sdsfree(err);
152888c716a0Santirez         server.repl_state = REPL_STATE_SEND_CAPA;
15293e6d4d59Santirez     }
15303e6d4d59Santirez 
15313e6d4d59Santirez     /* Inform the master of our capabilities. While we currently send
15323e6d4d59Santirez      * just one capability, it is possible to chain new capabilities here
15333e6d4d59Santirez      * in the form of REPLCONF capa X capa Y capa Z ...
15343e6d4d59Santirez      * The master will ignore capabilities it does not understand. */
153588c716a0Santirez     if (server.repl_state == REPL_STATE_SEND_CAPA) {
153688c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
153788c716a0Santirez                 "capa","eof",NULL);
153888c716a0Santirez         if (err) goto write_error;
153988c716a0Santirez         sdsfree(err);
154088c716a0Santirez         server.repl_state = REPL_STATE_RECEIVE_CAPA;
154188c716a0Santirez         return;
154288c716a0Santirez     }
15433e6d4d59Santirez 
154488c716a0Santirez     /* Receive CAPA reply. */
154588c716a0Santirez     if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
154688c716a0Santirez         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
15473e6d4d59Santirez         /* Ignore the error if any, not all the Redis versions support
15483e6d4d59Santirez          * REPLCONF capa. */
15493e6d4d59Santirez         if (err[0] == '-') {
15503e6d4d59Santirez             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
15513e6d4d59Santirez                                   "REPLCONF capa: %s", err);
155207888202Santirez         }
15533a328978Santirez         sdsfree(err);
155488c716a0Santirez         server.repl_state = REPL_STATE_SEND_PSYNC;
1555e2641e09Santirez     }
155607888202Santirez 
155707888202Santirez     /* Try a partial resynchonization. If we don't have a cached master
155807888202Santirez      * slaveTryPartialResynchronization() will at least try to use PSYNC
155907888202Santirez      * to start a full resynchronization so that we get the master run id
156007888202Santirez      * and the global offset, to try a partial resync at the next
156107888202Santirez      * reconnection attempt. */
156288c716a0Santirez     if (server.repl_state == REPL_STATE_SEND_PSYNC) {
156388c716a0Santirez         if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
156488c716a0Santirez             err = sdsnew("Write error sending the PSYNC command.");
156588c716a0Santirez             goto write_error;
156688c716a0Santirez         }
156788c716a0Santirez         server.repl_state = REPL_STATE_RECEIVE_PSYNC;
156888c716a0Santirez         return;
156988c716a0Santirez     }
157088c716a0Santirez 
157188c716a0Santirez     /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
157288c716a0Santirez     if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
157388c716a0Santirez         serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
157488c716a0Santirez                              "state should be RECEIVE_PSYNC but is %d",
157588c716a0Santirez                              server.repl_state);
157688c716a0Santirez         goto error;
157788c716a0Santirez     }
157888c716a0Santirez 
157988c716a0Santirez     psync_result = slaveTryPartialResynchronization(fd,1);
1580bea12591Santirez     if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
1581bea12591Santirez 
1582bea12591Santirez     /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
1583bea12591Santirez      * uninstalling the read handler from the file descriptor. */
158488c716a0Santirez 
158507888202Santirez     if (psync_result == PSYNC_CONTINUE) {
158632f80e2fSantirez         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
158707888202Santirez         return;
1588e2641e09Santirez     }
1589e2641e09Santirez 
1590c1e94b6bSantirez     /* PSYNC failed or is not supported: we want our slaves to resync with us
1591c1e94b6bSantirez      * as well, if we have any (chained replication case). The mater may
1592c1e94b6bSantirez      * transfer us an entirely different data set and we have no way to
1593c1e94b6bSantirez      * incrementally feed our slaves after that. */
1594c1e94b6bSantirez     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1595c1e94b6bSantirez     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1596c1e94b6bSantirez 
159707888202Santirez     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
159807888202Santirez      * and the server.repl_master_runid and repl_master_initial_offset are
159907888202Santirez      * already populated. */
160007888202Santirez     if (psync_result == PSYNC_NOT_SUPPORTED) {
160132f80e2fSantirez         serverLog(LL_NOTICE,"Retrying with SYNC...");
1602299290d3Santirez         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
160332f80e2fSantirez             serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
1604e2641e09Santirez                 strerror(errno));
1605a3309139SPieter Noordhuis             goto error;
1606e2641e09Santirez         }
160707888202Santirez     }
160826b33669Santirez 
160926b33669Santirez     /* Prepare a suitable temp file for bulk transfer */
1610e2641e09Santirez     while(maxtries--) {
1611e2641e09Santirez         snprintf(tmpfile,256,
1612d1949054SPremysl Hruby             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1613e2641e09Santirez         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1614e2641e09Santirez         if (dfd != -1) break;
1615e2641e09Santirez         sleep(1);
1616e2641e09Santirez     }
1617e2641e09Santirez     if (dfd == -1) {
161832f80e2fSantirez         serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1619a3309139SPieter Noordhuis         goto error;
1620e2641e09Santirez     }
1621e2641e09Santirez 
1622f4aa600bSantirez     /* Setup the non blocking download of the bulk file. */
162362ec599cSantirez     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
162462ec599cSantirez             == AE_ERR)
1625f4aa600bSantirez     {
162632f80e2fSantirez         serverLog(LL_WARNING,
1627d7740fc8Santirez             "Can't create readable event for SYNC: %s (fd=%d)",
1628d7740fc8Santirez             strerror(errno),fd);
1629a3309139SPieter Noordhuis         goto error;
1630a3309139SPieter Noordhuis     }
1631a3309139SPieter Noordhuis 
163232f80e2fSantirez     server.repl_state = REPL_STATE_TRANSFER;
1633784b9308Santirez     server.repl_transfer_size = -1;
1634784b9308Santirez     server.repl_transfer_read = 0;
1635784b9308Santirez     server.repl_transfer_last_fsync_off = 0;
1636a3309139SPieter Noordhuis     server.repl_transfer_fd = dfd;
1637d1949054SPremysl Hruby     server.repl_transfer_lastio = server.unixtime;
1638a3309139SPieter Noordhuis     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1639a3309139SPieter Noordhuis     return;
1640a3309139SPieter Noordhuis 
1641a3309139SPieter Noordhuis error:
1642bea12591Santirez     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1643a3309139SPieter Noordhuis     close(fd);
1644bb66fc31Santirez     server.repl_transfer_s = -1;
164532f80e2fSantirez     server.repl_state = REPL_STATE_CONNECT;
1646a3309139SPieter Noordhuis     return;
164788c716a0Santirez 
164888c716a0Santirez write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
164988c716a0Santirez     serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
165088c716a0Santirez     sdsfree(err);
165188c716a0Santirez     goto error;
1652a3309139SPieter Noordhuis }
1653a3309139SPieter Noordhuis 
connectWithMaster(void)1654a3309139SPieter Noordhuis int connectWithMaster(void) {
1655a3309139SPieter Noordhuis     int fd;
1656a3309139SPieter Noordhuis 
16578366907bSantirez     fd = anetTcpNonBlockBestEffortBindConnect(NULL,
165832f80e2fSantirez         server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
1659a3309139SPieter Noordhuis     if (fd == -1) {
166032f80e2fSantirez         serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
1661a3309139SPieter Noordhuis             strerror(errno));
166240eb548aSantirez         return C_ERR;
1663a3309139SPieter Noordhuis     }
1664a3309139SPieter Noordhuis 
166545029d37Santirez     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1666b075621fSPieter Noordhuis             AE_ERR)
1667a3309139SPieter Noordhuis     {
1668e2641e09Santirez         close(fd);
166932f80e2fSantirez         serverLog(LL_WARNING,"Can't create readable event for SYNC");
167040eb548aSantirez         return C_ERR;
1671e2641e09Santirez     }
1672a3309139SPieter Noordhuis 
1673d1949054SPremysl Hruby     server.repl_transfer_lastio = server.unixtime;
1674f4aa600bSantirez     server.repl_transfer_s = fd;
167532f80e2fSantirez     server.repl_state = REPL_STATE_CONNECTING;
167640eb548aSantirez     return C_OK;
1677e2641e09Santirez }
1678e2641e09Santirez 
167927acd7aaSantirez /* This function can be called when a non blocking connection is currently
1680c5f8c80aSantirez  * in progress to undo it.
1681c5f8c80aSantirez  * Never call this function directly, use cancelReplicationHandshake() instead.
1682c5f8c80aSantirez  */
undoConnectWithMaster(void)168327acd7aaSantirez void undoConnectWithMaster(void) {
168427acd7aaSantirez     int fd = server.repl_transfer_s;
168527acd7aaSantirez 
168627acd7aaSantirez     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
168727acd7aaSantirez     close(fd);
168827acd7aaSantirez     server.repl_transfer_s = -1;
1689c5f8c80aSantirez }
1690c5f8c80aSantirez 
1691c5f8c80aSantirez /* Abort the async download of the bulk dataset while SYNC-ing with master.
1692c5f8c80aSantirez  * Never call this function directly, use cancelReplicationHandshake() instead.
1693c5f8c80aSantirez  */
replicationAbortSyncTransfer(void)1694c5f8c80aSantirez void replicationAbortSyncTransfer(void) {
1695c5f8c80aSantirez     serverAssert(server.repl_state == REPL_STATE_TRANSFER);
1696c5f8c80aSantirez     undoConnectWithMaster();
1697c5f8c80aSantirez     close(server.repl_transfer_fd);
1698c5f8c80aSantirez     unlink(server.repl_transfer_tmpfile);
1699c5f8c80aSantirez     zfree(server.repl_transfer_tmpfile);
170027acd7aaSantirez }
170127acd7aaSantirez 
1702ef99e146Santirez /* This function aborts a non blocking replication attempt if there is one
1703ef99e146Santirez  * in progress, by canceling the non-blocking connect attempt or
1704ef99e146Santirez  * the initial bulk transfer.
1705ef99e146Santirez  *
1706ef99e146Santirez  * If there was a replication handshake in progress 1 is returned and
170732f80e2fSantirez  * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
1708ef99e146Santirez  *
1709ef99e146Santirez  * Otherwise zero is returned and no operation is perforemd at all. */
cancelReplicationHandshake(void)1710ef99e146Santirez int cancelReplicationHandshake(void) {
171132f80e2fSantirez     if (server.repl_state == REPL_STATE_TRANSFER) {
1712ef99e146Santirez         replicationAbortSyncTransfer();
1713c5f8c80aSantirez         server.repl_state = REPL_STATE_CONNECT;
171432f80e2fSantirez     } else if (server.repl_state == REPL_STATE_CONNECTING ||
1715e3344b80Santirez                slaveIsInHandshakeState())
1716ef99e146Santirez     {
1717ef99e146Santirez         undoConnectWithMaster();
1718c5f8c80aSantirez         server.repl_state = REPL_STATE_CONNECT;
1719ef99e146Santirez     } else {
1720ef99e146Santirez         return 0;
1721ef99e146Santirez     }
1722ef99e146Santirez     return 1;
1723ef99e146Santirez }
1724ef99e146Santirez 
17257bead003Santirez /* Set replication to the specified master address and port. */
replicationSetMaster(char * ip,int port)17267bead003Santirez void replicationSetMaster(char *ip, int port) {
17277bead003Santirez     sdsfree(server.masterhost);
17283be89312Santirez     server.masterhost = sdsnew(ip);
17297bead003Santirez     server.masterport = port;
17307bead003Santirez     if (server.master) freeClient(server.master);
1731c3ad7090Santirez     disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
17327bead003Santirez     disconnectSlaves(); /* Force our slaves to resync with us as well. */
17337bead003Santirez     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
17347bead003Santirez     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
17357bead003Santirez     cancelReplicationHandshake();
173632f80e2fSantirez     server.repl_state = REPL_STATE_CONNECT;
173794e8c9e7Santirez     server.master_repl_offset = 0;
1738abd6308dSantirez     server.repl_down_since = 0;
17397bead003Santirez }
17407bead003Santirez 
17417bead003Santirez /* Cancel replication, setting the instance as a master itself. */
replicationUnsetMaster(void)17427bead003Santirez void replicationUnsetMaster(void) {
17437bead003Santirez     if (server.masterhost == NULL) return; /* Nothing to do. */
1744e2641e09Santirez     sdsfree(server.masterhost);
1745e2641e09Santirez     server.masterhost = NULL;
174694e8c9e7Santirez     if (server.master) {
174794e8c9e7Santirez         if (listLength(server.slaves) == 0) {
174894e8c9e7Santirez             /* If this instance is turned into a master and there are no
174994e8c9e7Santirez              * slaves, it inherits the replication offset from the master.
175094e8c9e7Santirez              * Under certain conditions this makes replicas comparable by
175194e8c9e7Santirez              * replication offset to understand what is the most updated. */
175294e8c9e7Santirez             server.master_repl_offset = server.master->reploff;
175394e8c9e7Santirez             freeReplicationBacklog();
175494e8c9e7Santirez         }
175594e8c9e7Santirez         freeClient(server.master);
175694e8c9e7Santirez     }
175707888202Santirez     replicationDiscardCachedMaster();
1758ef99e146Santirez     cancelReplicationHandshake();
175932f80e2fSantirez     server.repl_state = REPL_STATE_NONE;
17607bead003Santirez }
17617bead003Santirez 
1762278ea9d1Santirez /* This function is called when the slave lose the connection with the
1763278ea9d1Santirez  * master into an unexpected way. */
replicationHandleMasterDisconnection(void)1764278ea9d1Santirez void replicationHandleMasterDisconnection(void) {
1765278ea9d1Santirez     server.master = NULL;
1766278ea9d1Santirez     server.repl_state = REPL_STATE_CONNECT;
1767278ea9d1Santirez     server.repl_down_since = server.unixtime;
1768c1e94b6bSantirez     /* We lost connection with our master, don't disconnect slaves yet,
1769c1e94b6bSantirez      * maybe we'll be able to PSYNC with our master later. We'll disconnect
1770c1e94b6bSantirez      * the slaves only if we'll have to do a full resync with our master. */
1771278ea9d1Santirez }
1772278ea9d1Santirez 
slaveofCommand(client * c)1773554bd0e7Santirez void slaveofCommand(client *c) {
1774b7d085fcSantirez     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1775b7d085fcSantirez      * configured using the current address of the master node. */
1776b7d085fcSantirez     if (server.cluster_enabled) {
1777b7d085fcSantirez         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1778b7d085fcSantirez         return;
1779b7d085fcSantirez     }
1780b7d085fcSantirez 
1781b7d085fcSantirez     /* The special host/port combination "NO" "ONE" turns the instance
1782b7d085fcSantirez      * into a master. Otherwise the new master address is set. */
17837bead003Santirez     if (!strcasecmp(c->argv[1]->ptr,"no") &&
17847bead003Santirez         !strcasecmp(c->argv[2]->ptr,"one")) {
17857bead003Santirez         if (server.masterhost) {
17867bead003Santirez             replicationUnsetMaster();
1787d036abe2Santirez             sds client = catClientInfoString(sdsempty(),c);
1788d036abe2Santirez             serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
1789d036abe2Santirez                 client);
1790d036abe2Santirez             sdsfree(client);
1791e2641e09Santirez         }
1792e2641e09Santirez     } else {
1793ebdfad69Santirez         long port;
1794ebdfad69Santirez 
179540eb548aSantirez         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
1796ebdfad69Santirez             return;
1797ebdfad69Santirez 
1798ebdfad69Santirez         /* Check if we are already attached to the specified slave */
1799ebdfad69Santirez         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1800ebdfad69Santirez             && server.masterport == port) {
180132f80e2fSantirez             serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1802ebdfad69Santirez             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1803ebdfad69Santirez             return;
1804ebdfad69Santirez         }
1805ebdfad69Santirez         /* There was no previous master or the user specified a different one,
1806ebdfad69Santirez          * we can continue. */
18077bead003Santirez         replicationSetMaster(c->argv[1]->ptr, port);
1808d036abe2Santirez         sds client = catClientInfoString(sdsempty(),c);
1809d036abe2Santirez         serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
1810d036abe2Santirez             server.masterhost, server.masterport, client);
1811d036abe2Santirez         sdsfree(client);
1812e2641e09Santirez     }
1813e2641e09Santirez     addReply(c,shared.ok);
1814e2641e09Santirez }
1815f4aa600bSantirez 
1816d34c2fa3Santirez /* ROLE command: provide information about the role of the instance
1817d34c2fa3Santirez  * (master or slave) and additional information related to replication
1818d34c2fa3Santirez  * in an easy to process format. */
roleCommand(client * c)1819554bd0e7Santirez void roleCommand(client *c) {
1820d34c2fa3Santirez     if (server.masterhost == NULL) {
1821d34c2fa3Santirez         listIter li;
1822d34c2fa3Santirez         listNode *ln;
1823d34c2fa3Santirez         void *mbcount;
1824d34c2fa3Santirez         int slaves = 0;
1825d34c2fa3Santirez 
1826d34c2fa3Santirez         addReplyMultiBulkLen(c,3);
1827d34c2fa3Santirez         addReplyBulkCBuffer(c,"master",6);
1828d34c2fa3Santirez         addReplyLongLong(c,server.master_repl_offset);
1829d34c2fa3Santirez         mbcount = addDeferredMultiBulkLength(c);
1830d34c2fa3Santirez         listRewind(server.slaves,&li);
1831d34c2fa3Santirez         while((ln = listNext(&li))) {
1832554bd0e7Santirez             client *slave = ln->value;
18330a45fbc3Santirez             char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
1834d34c2fa3Santirez 
18350a45fbc3Santirez             if (slaveip[0] == '\0') {
18360a45fbc3Santirez                 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1)
18370a45fbc3Santirez                     continue;
18380a45fbc3Santirez                 slaveip = ip;
18390a45fbc3Santirez             }
184032f80e2fSantirez             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
1841d34c2fa3Santirez             addReplyMultiBulkLen(c,3);
18420a45fbc3Santirez             addReplyBulkCString(c,slaveip);
1843d34c2fa3Santirez             addReplyBulkLongLong(c,slave->slave_listening_port);
1844d34c2fa3Santirez             addReplyBulkLongLong(c,slave->repl_ack_off);
1845d34c2fa3Santirez             slaves++;
1846d34c2fa3Santirez         }
1847d34c2fa3Santirez         setDeferredMultiBulkLength(c,mbcount,slaves);
1848d34c2fa3Santirez     } else {
18496a13193dSantirez         char *slavestate = NULL;
18506a13193dSantirez 
18517970d539Santirez         addReplyMultiBulkLen(c,5);
1852d34c2fa3Santirez         addReplyBulkCBuffer(c,"slave",5);
1853d34c2fa3Santirez         addReplyBulkCString(c,server.masterhost);
1854d34c2fa3Santirez         addReplyLongLong(c,server.masterport);
1855e3344b80Santirez         if (slaveIsInHandshakeState()) {
1856e3344b80Santirez             slavestate = "handshake";
1857e3344b80Santirez         } else {
18586a13193dSantirez             switch(server.repl_state) {
185932f80e2fSantirez             case REPL_STATE_NONE: slavestate = "none"; break;
186032f80e2fSantirez             case REPL_STATE_CONNECT: slavestate = "connect"; break;
186132f80e2fSantirez             case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
186232f80e2fSantirez             case REPL_STATE_TRANSFER: slavestate = "sync"; break;
186332f80e2fSantirez             case REPL_STATE_CONNECTED: slavestate = "connected"; break;
18646a13193dSantirez             default: slavestate = "unknown"; break;
18656a13193dSantirez             }
1866e3344b80Santirez         }
18676a13193dSantirez         addReplyBulkCString(c,slavestate);
18686a13193dSantirez         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1869d34c2fa3Santirez     }
1870d34c2fa3Santirez }
1871d34c2fa3Santirez 
1872efd87031Santirez /* Send a REPLCONF ACK command to the master to inform it about the current
1873efd87031Santirez  * processed offset. If we are not connected with a master, the command has
1874efd87031Santirez  * no effects. */
replicationSendAck(void)1875efd87031Santirez void replicationSendAck(void) {
1876554bd0e7Santirez     client *c = server.master;
1877efd87031Santirez 
1878efd87031Santirez     if (c != NULL) {
187932f80e2fSantirez         c->flags |= CLIENT_MASTER_FORCE_REPLY;
1880efd87031Santirez         addReplyMultiBulkLen(c,3);
1881efd87031Santirez         addReplyBulkCString(c,"REPLCONF");
1882efd87031Santirez         addReplyBulkCString(c,"ACK");
1883efd87031Santirez         addReplyBulkLongLong(c,c->reploff);
188432f80e2fSantirez         c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
1885efd87031Santirez     }
1886efd87031Santirez }
1887efd87031Santirez 
188807888202Santirez /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
188907888202Santirez 
189007888202Santirez /* In order to implement partial synchronization we need to be able to cache
189107888202Santirez  * our master's client structure after a transient disconnection.
189207888202Santirez  * It is cached into server.cached_master and flushed away using the following
189307888202Santirez  * functions. */
189407888202Santirez 
189507888202Santirez /* This function is called by freeClient() in order to cache the master
189607888202Santirez  * client structure instead of destryoing it. freeClient() will return
189707888202Santirez  * ASAP after this function returns, so every action needed to avoid problems
189807888202Santirez  * with a client that is really "suspended" has to be done by this function.
189907888202Santirez  *
190007888202Santirez  * The other functions that will deal with the cached master are:
190107888202Santirez  *
190207888202Santirez  * replicationDiscardCachedMaster() that will make sure to kill the client
190307888202Santirez  * as for some reason we don't want to use it in the future.
190407888202Santirez  *
190507888202Santirez  * replicationResurrectCachedMaster() that is used after a successful PSYNC
190607888202Santirez  * handshake in order to reactivate the cached master.
190707888202Santirez  */
replicationCacheMaster(client * c)1908554bd0e7Santirez void replicationCacheMaster(client *c) {
19092d9e3eb1Santirez     serverAssert(server.master != NULL && server.cached_master == NULL);
191032f80e2fSantirez     serverLog(LL_NOTICE,"Caching the disconnected master state.");
191107888202Santirez 
191247e6cf11Santirez     /* Unlink the client from the server structures. */
191347e6cf11Santirez     unlinkClient(c);
191423e7710cSantirez 
191507888202Santirez     /* Save the master. Server.master will be set to null later by
191607888202Santirez      * replicationHandleMasterDisconnection(). */
191707888202Santirez     server.cached_master = server.master;
191807888202Santirez 
19190bcc7cb4Santirez     /* Invalidate the Peer ID cache. */
19200bcc7cb4Santirez     if (c->peerid) {
19210bcc7cb4Santirez         sdsfree(c->peerid);
19220bcc7cb4Santirez         c->peerid = NULL;
19230bcc7cb4Santirez     }
19240bcc7cb4Santirez 
192507888202Santirez     /* Caching the master happens instead of the actual freeClient() call,
192607888202Santirez      * so make sure to adjust the replication state. This function will
192707888202Santirez      * also set server.master to NULL. */
192807888202Santirez     replicationHandleMasterDisconnection();
192907888202Santirez }
193007888202Santirez 
193107888202Santirez /* Free a cached master, called when there are no longer the conditions for
193207888202Santirez  * a partial resync on reconnection. */
replicationDiscardCachedMaster(void)193307888202Santirez void replicationDiscardCachedMaster(void) {
193407888202Santirez     if (server.cached_master == NULL) return;
193507888202Santirez 
193632f80e2fSantirez     serverLog(LL_NOTICE,"Discarding previously cached master state.");
193732f80e2fSantirez     server.cached_master->flags &= ~CLIENT_MASTER;
193807888202Santirez     freeClient(server.cached_master);
193907888202Santirez     server.cached_master = NULL;
194007888202Santirez }
194107888202Santirez 
194207888202Santirez /* Turn the cached master into the current master, using the file descriptor
194307888202Santirez  * passed as argument as the socket for the new master.
194407888202Santirez  *
19453a82b8acSAaron Rutkovsky  * This function is called when successfully setup a partial resynchronization
194607888202Santirez  * so the stream of data that we'll receive will start from were this
194707888202Santirez  * master left. */
replicationResurrectCachedMaster(int newfd)194807888202Santirez void replicationResurrectCachedMaster(int newfd) {
194907888202Santirez     server.master = server.cached_master;
195007888202Santirez     server.cached_master = NULL;
195107888202Santirez     server.master->fd = newfd;
195232f80e2fSantirez     server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
195307888202Santirez     server.master->authenticated = 1;
195407888202Santirez     server.master->lastinteraction = server.unixtime;
195532f80e2fSantirez     server.repl_state = REPL_STATE_CONNECTED;
195607888202Santirez 
195707888202Santirez     /* Re-add to the list of clients. */
195807888202Santirez     listAddNodeTail(server.clients,server.master);
195907888202Santirez     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
196007888202Santirez                           readQueryFromClient, server.master)) {
196132f80e2fSantirez         serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
196207888202Santirez         freeClientAsync(server.master); /* Close ASAP. */
196307888202Santirez     }
19641461422cSantirez 
19651461422cSantirez     /* We may also need to install the write handler as well if there is
19661461422cSantirez      * pending data in the write buffers. */
19672d21af45Santirez     if (clientHasPendingReplies(server.master)) {
19681461422cSantirez         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
19691461422cSantirez                           sendReplyToClient, server.master)) {
197032f80e2fSantirez             serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
19711461422cSantirez             freeClientAsync(server.master); /* Close ASAP. */
19721461422cSantirez         }
19731461422cSantirez     }
197407888202Santirez }
197507888202Santirez 
1976ed599d3aSantirez /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1977ed599d3aSantirez 
1978ed599d3aSantirez /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1979ed599d3aSantirez  * If the option is active, the server will prevent writes if there are not
1980ed599d3aSantirez  * enough connected slaves with the specified lag (or less). */
refreshGoodSlavesCount(void)1981ed599d3aSantirez void refreshGoodSlavesCount(void) {
1982ed599d3aSantirez     listIter li;
1983ed599d3aSantirez     listNode *ln;
1984ed599d3aSantirez     int good = 0;
1985ed599d3aSantirez 
1986ed599d3aSantirez     if (!server.repl_min_slaves_to_write ||
1987ed599d3aSantirez         !server.repl_min_slaves_max_lag) return;
1988ed599d3aSantirez 
1989ed599d3aSantirez     listRewind(server.slaves,&li);
1990ed599d3aSantirez     while((ln = listNext(&li))) {
1991554bd0e7Santirez         client *slave = ln->value;
1992ed599d3aSantirez         time_t lag = server.unixtime - slave->repl_ack_time;
1993ed599d3aSantirez 
199432f80e2fSantirez         if (slave->replstate == SLAVE_STATE_ONLINE &&
1995ed599d3aSantirez             lag <= server.repl_min_slaves_max_lag) good++;
1996ed599d3aSantirez     }
1997ed599d3aSantirez     server.repl_good_slaves_count = good;
1998ed599d3aSantirez }
1999ed599d3aSantirez 
200094ec7db4Santirez /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
200194ec7db4Santirez  * The goal of this code is to keep track of scripts already sent to every
200294ec7db4Santirez  * connected slave, in order to be able to replicate EVALSHA as it is without
200394ec7db4Santirez  * translating it to EVAL every time it is possible.
200494ec7db4Santirez  *
200511e81a1eSantirez  * We use a capped collection implemented by a hash table for fast lookup
200694ec7db4Santirez  * of scripts we can send as EVALSHA, plus a linked list that is used for
200794ec7db4Santirez  * eviction of the oldest entry when the max number of items is reached.
200894ec7db4Santirez  *
200994ec7db4Santirez  * We don't care about taking a different cache for every different slave
201094ec7db4Santirez  * since to fill the cache again is not very costly, the goal of this code
201194ec7db4Santirez  * is to avoid that the same big script is trasmitted a big number of times
201294ec7db4Santirez  * per second wasting bandwidth and processor speed, but it is not a problem
201394ec7db4Santirez  * if we need to rebuild the cache from scratch from time to time, every used
201494ec7db4Santirez  * script will need to be transmitted a single time to reappear in the cache.
201594ec7db4Santirez  *
201694ec7db4Santirez  * This is how the system works:
201794ec7db4Santirez  *
201894ec7db4Santirez  * 1) Every time a new slave connects, we flush the whole script cache.
201994ec7db4Santirez  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
202094ec7db4Santirez  *    trying to convert EVAL into EVALSHA specifically for slaves.
202194ec7db4Santirez  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
202294ec7db4Santirez  *    corresponding SHA1 of the script into the cache as we are sure every
202394ec7db4Santirez  *    slave knows about the script starting from now.
202494ec7db4Santirez  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
202594ec7db4Santirez  *    and at the same time flush the script cache.
202694ec7db4Santirez  * 5) When the last slave disconnects, flush the cache.
202794ec7db4Santirez  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
202894ec7db4Santirez  *    in the master sometimes.
202994ec7db4Santirez  */
203094ec7db4Santirez 
203194ec7db4Santirez /* Initialize the script cache, only called at startup. */
replicationScriptCacheInit(void)203294ec7db4Santirez void replicationScriptCacheInit(void) {
203394ec7db4Santirez     server.repl_scriptcache_size = 10000;
203494ec7db4Santirez     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
203594ec7db4Santirez     server.repl_scriptcache_fifo = listCreate();
203694ec7db4Santirez }
203794ec7db4Santirez 
203894ec7db4Santirez /* Empty the script cache. Should be called every time we are no longer sure
2039f0bf5fd8Santirez  * that every slave knows about all the scripts in our set, or when the
2040f0bf5fd8Santirez  * current AOF "context" is no longer aware of the script. In general we
2041f0bf5fd8Santirez  * should flush the cache:
2042f0bf5fd8Santirez  *
2043f0bf5fd8Santirez  * 1) Every time a new slave reconnects to this master and performs a
2044f0bf5fd8Santirez  *    full SYNC (PSYNC does not require flushing).
2045f0bf5fd8Santirez  * 2) Every time an AOF rewrite is performed.
2046f0bf5fd8Santirez  * 3) Every time we are left without slaves at all, and AOF is off, in order
2047f0bf5fd8Santirez  *    to reclaim otherwise unused memory.
2048f0bf5fd8Santirez  */
replicationScriptCacheFlush(void)204994ec7db4Santirez void replicationScriptCacheFlush(void) {
20502eb781b3Santirez     dictEmpty(server.repl_scriptcache_dict,NULL);
205194ec7db4Santirez     listRelease(server.repl_scriptcache_fifo);
205294ec7db4Santirez     server.repl_scriptcache_fifo = listCreate();
205394ec7db4Santirez }
205494ec7db4Santirez 
205594ec7db4Santirez /* Add an entry into the script cache, if we reach max number of entries the
205694ec7db4Santirez  * oldest is removed from the list. */
replicationScriptCacheAdd(sds sha1)205794ec7db4Santirez void replicationScriptCacheAdd(sds sha1) {
205894ec7db4Santirez     int retval;
205994ec7db4Santirez     sds key = sdsdup(sha1);
206094ec7db4Santirez 
206194ec7db4Santirez     /* Evict oldest. */
206294ec7db4Santirez     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
206394ec7db4Santirez     {
206494ec7db4Santirez         listNode *ln = listLast(server.repl_scriptcache_fifo);
206594ec7db4Santirez         sds oldest = listNodeValue(ln);
206694ec7db4Santirez 
206794ec7db4Santirez         retval = dictDelete(server.repl_scriptcache_dict,oldest);
20682d9e3eb1Santirez         serverAssert(retval == DICT_OK);
206994ec7db4Santirez         listDelNode(server.repl_scriptcache_fifo,ln);
207094ec7db4Santirez     }
207194ec7db4Santirez 
207294ec7db4Santirez     /* Add current. */
207394ec7db4Santirez     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
207494ec7db4Santirez     listAddNodeHead(server.repl_scriptcache_fifo,key);
20752d9e3eb1Santirez     serverAssert(retval == DICT_OK);
207694ec7db4Santirez }
207794ec7db4Santirez 
207894ec7db4Santirez /* Returns non-zero if the specified entry exists inside the cache, that is,
207994ec7db4Santirez  * if all the slaves are aware of this script SHA1. */
replicationScriptCacheExists(sds sha1)208094ec7db4Santirez int replicationScriptCacheExists(sds sha1) {
2081f0bf5fd8Santirez     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
208294ec7db4Santirez }
208394ec7db4Santirez 
2084c5618e7fSantirez /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2085c5618e7fSantirez  * Redis synchronous replication design can be summarized in points:
2086c5618e7fSantirez  *
2087c5618e7fSantirez  * - Redis masters have a global replication offset, used by PSYNC.
2088c5618e7fSantirez  * - Master increment the offset every time new commands are sent to slaves.
2089c5618e7fSantirez  * - Slaves ping back masters with the offset processed so far.
2090c5618e7fSantirez  *
2091c5618e7fSantirez  * So synchronous replication adds a new WAIT command in the form:
2092c5618e7fSantirez  *
2093c5618e7fSantirez  *   WAIT <num_replicas> <milliseconds_timeout>
2094c5618e7fSantirez  *
2095c5618e7fSantirez  * That returns the number of replicas that processed the query when
2096c5618e7fSantirez  * we finally have at least num_replicas, or when the timeout was
2097c5618e7fSantirez  * reached.
2098c5618e7fSantirez  *
2099c5618e7fSantirez  * The command is implemented in this way:
2100c5618e7fSantirez  *
2101c5618e7fSantirez  * - Every time a client processes a command, we remember the replication
2102c5618e7fSantirez  *   offset after sending that command to the slaves.
2103c5618e7fSantirez  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2104c5618e7fSantirez  *   The client is blocked at the same time (see blocked.c).
2105c5618e7fSantirez  * - Once we receive enough ACKs for a given offset or when the timeout
2106c5618e7fSantirez  *   is reached, the WAIT command is unblocked and the reply sent to the
2107c5618e7fSantirez  *   client.
2108c5618e7fSantirez  */
2109c5618e7fSantirez 
2110c5618e7fSantirez /* This just set a flag so that we broadcast a REPLCONF GETACK command
2111c5618e7fSantirez  * to all the slaves in the beforeSleep() function. Note that this way
2112c5618e7fSantirez  * we "group" all the clients that want to wait for synchronouns replication
2113c5618e7fSantirez  * in a given event loop iteration, and send a single GETACK for them all. */
replicationRequestAckFromSlaves(void)2114c5618e7fSantirez void replicationRequestAckFromSlaves(void) {
2115c5618e7fSantirez     server.get_ack_from_slaves = 1;
2116c5618e7fSantirez }
2117c5618e7fSantirez 
2118c5618e7fSantirez /* Return the number of slaves that already acknowledged the specified
2119c5618e7fSantirez  * replication offset. */
replicationCountAcksByOffset(long long offset)2120c5618e7fSantirez int replicationCountAcksByOffset(long long offset) {
2121c5618e7fSantirez     listIter li;
2122c5618e7fSantirez     listNode *ln;
2123c5618e7fSantirez     int count = 0;
2124c5618e7fSantirez 
2125c5618e7fSantirez     listRewind(server.slaves,&li);
2126c5618e7fSantirez     while((ln = listNext(&li))) {
2127554bd0e7Santirez         client *slave = ln->value;
2128c5618e7fSantirez 
212932f80e2fSantirez         if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2130c5618e7fSantirez         if (slave->repl_ack_off >= offset) count++;
2131c5618e7fSantirez     }
2132c5618e7fSantirez     return count;
2133c5618e7fSantirez }
2134c5618e7fSantirez 
2135c5618e7fSantirez /* WAIT for N replicas to acknowledge the processing of our latest
2136c5618e7fSantirez  * write command (and all the previous commands). */
waitCommand(client * c)2137554bd0e7Santirez void waitCommand(client *c) {
2138c5618e7fSantirez     mstime_t timeout;
2139c5618e7fSantirez     long numreplicas, ackreplicas;
2140c5618e7fSantirez     long long offset = c->woff;
2141c5618e7fSantirez 
2142c5618e7fSantirez     /* Argument parsing. */
214340eb548aSantirez     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
2144c5618e7fSantirez         return;
2145c5618e7fSantirez     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
214640eb548aSantirez         != C_OK) return;
2147c5618e7fSantirez 
2148c5618e7fSantirez     /* First try without blocking at all. */
2149c5618e7fSantirez     ackreplicas = replicationCountAcksByOffset(c->woff);
215032f80e2fSantirez     if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
2151c5618e7fSantirez         addReplyLongLong(c,ackreplicas);
2152c5618e7fSantirez         return;
2153c5618e7fSantirez     }
2154c5618e7fSantirez 
2155c5618e7fSantirez     /* Otherwise block the client and put it into our list of clients
2156c5618e7fSantirez      * waiting for ack from slaves. */
2157c5618e7fSantirez     c->bpop.timeout = timeout;
2158c5618e7fSantirez     c->bpop.reploffset = offset;
2159c5618e7fSantirez     c->bpop.numreplicas = numreplicas;
2160c5618e7fSantirez     listAddNodeTail(server.clients_waiting_acks,c);
216132f80e2fSantirez     blockClient(c,BLOCKED_WAIT);
2162c5618e7fSantirez 
2163c5618e7fSantirez     /* Make sure that the server will send an ACK request to all the slaves
2164c5618e7fSantirez      * before returning to the event loop. */
2165c5618e7fSantirez     replicationRequestAckFromSlaves();
2166c5618e7fSantirez }
2167c5618e7fSantirez 
2168c5618e7fSantirez /* This is called by unblockClient() to perform the blocking op type
2169c5618e7fSantirez  * specific cleanup. We just remove the client from the list of clients
2170c5618e7fSantirez  * waiting for replica acks. Never call it directly, call unblockClient()
2171c5618e7fSantirez  * instead. */
unblockClientWaitingReplicas(client * c)2172554bd0e7Santirez void unblockClientWaitingReplicas(client *c) {
2173c5618e7fSantirez     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
21742d9e3eb1Santirez     serverAssert(ln != NULL);
2175c5618e7fSantirez     listDelNode(server.clients_waiting_acks,ln);
2176c5618e7fSantirez }
2177c5618e7fSantirez 
2178c5618e7fSantirez /* Check if there are clients blocked in WAIT that can be unblocked since
2179c5618e7fSantirez  * we received enough ACKs from slaves. */
processClientsWaitingReplicas(void)2180c5618e7fSantirez void processClientsWaitingReplicas(void) {
2181c5618e7fSantirez     long long last_offset = 0;
2182c5618e7fSantirez     int last_numreplicas = 0;
2183c5618e7fSantirez 
2184c5618e7fSantirez     listIter li;
2185c5618e7fSantirez     listNode *ln;
2186c5618e7fSantirez 
2187c5618e7fSantirez     listRewind(server.clients_waiting_acks,&li);
2188c5618e7fSantirez     while((ln = listNext(&li))) {
2189554bd0e7Santirez         client *c = ln->value;
2190c5618e7fSantirez 
2191c5618e7fSantirez         /* Every time we find a client that is satisfied for a given
2192c5618e7fSantirez          * offset and number of replicas, we remember it so the next client
2193c5618e7fSantirez          * may be unblocked without calling replicationCountAcksByOffset()
2194c5618e7fSantirez          * if the requested offset / replicas were equal or less. */
2195c5618e7fSantirez         if (last_offset && last_offset > c->bpop.reploffset &&
2196c5618e7fSantirez                            last_numreplicas > c->bpop.numreplicas)
2197c5618e7fSantirez         {
2198c5618e7fSantirez             unblockClient(c);
2199c5618e7fSantirez             addReplyLongLong(c,last_numreplicas);
2200c5618e7fSantirez         } else {
2201c5618e7fSantirez             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
2202c5618e7fSantirez 
2203c5618e7fSantirez             if (numreplicas >= c->bpop.numreplicas) {
2204c5618e7fSantirez                 last_offset = c->bpop.reploffset;
2205c5618e7fSantirez                 last_numreplicas = numreplicas;
2206c5618e7fSantirez                 unblockClient(c);
2207c5618e7fSantirez                 addReplyLongLong(c,numreplicas);
2208c5618e7fSantirez             }
2209c5618e7fSantirez         }
2210c5618e7fSantirez     }
2211c5618e7fSantirez }
2212c5618e7fSantirez 
22136f540320Santirez /* Return the slave replication offset for this instance, that is
22146f540320Santirez  * the offset for which we already processed the master replication stream. */
replicationGetSlaveOffset(void)22156f540320Santirez long long replicationGetSlaveOffset(void) {
22166f540320Santirez     long long offset = 0;
22176f540320Santirez 
22186f540320Santirez     if (server.masterhost != NULL) {
22196f540320Santirez         if (server.master) {
22206f540320Santirez             offset = server.master->reploff;
22216f540320Santirez         } else if (server.cached_master) {
22226f540320Santirez             offset = server.cached_master->reploff;
22236f540320Santirez         }
22246f540320Santirez     }
22256f540320Santirez     /* offset may be -1 when the master does not support it at all, however
22266f540320Santirez      * this function is designed to return an offset that can express the
22276f540320Santirez      * amount of data processed by the master, so we return a positive
22286f540320Santirez      * integer. */
22296f540320Santirez     if (offset < 0) offset = 0;
22306f540320Santirez     return offset;
22316f540320Santirez }
22326f540320Santirez 
2233c5618e7fSantirez /* --------------------------- REPLICATION CRON  ---------------------------- */
2234f4aa600bSantirez 
22353a82b8acSAaron Rutkovsky /* Replication cron function, called 1 time per second. */
replicationCron(void)2236f4aa600bSantirez void replicationCron(void) {
223755ba7727Santirez     static long long replication_cron_loops = 0;
223855ba7727Santirez 
223927acd7aaSantirez     /* Non blocking connection timeout? */
2240bb66fc31Santirez     if (server.masterhost &&
224132f80e2fSantirez         (server.repl_state == REPL_STATE_CONNECTING ||
2242e3344b80Santirez          slaveIsInHandshakeState()) &&
224327acd7aaSantirez          (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
224427acd7aaSantirez     {
224532f80e2fSantirez         serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
2246c5f8c80aSantirez         cancelReplicationHandshake();
224727acd7aaSantirez     }
224827acd7aaSantirez 
2249f4aa600bSantirez     /* Bulk transfer I/O timeout? */
225032f80e2fSantirez     if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
22518996bf77Santirez         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2252f4aa600bSantirez     {
225332f80e2fSantirez         serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
2254c5f8c80aSantirez         cancelReplicationHandshake();
2255f4aa600bSantirez     }
2256f4aa600bSantirez 
225789a1433eSantirez     /* Timed out master when we are an already connected slave? */
225832f80e2fSantirez     if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
22598996bf77Santirez         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
226089a1433eSantirez     {
226132f80e2fSantirez         serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
226289a1433eSantirez         freeClient(server.master);
226389a1433eSantirez     }
226489a1433eSantirez 
2265f4aa600bSantirez     /* Check if we should connect to a MASTER */
226632f80e2fSantirez     if (server.repl_state == REPL_STATE_CONNECT) {
226732f80e2fSantirez         serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
2268b2f83439Santirez             server.masterhost, server.masterport);
226940eb548aSantirez         if (connectWithMaster() == C_OK) {
227032f80e2fSantirez             serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
2271f4aa600bSantirez         }
2272f4aa600bSantirez     }
2273e06a5604Santirez 
227490a81b4eSantirez     /* Send ACK to master from time to time.
227590a81b4eSantirez      * Note that we do not send periodic acks to masters that don't
227690a81b4eSantirez      * support PSYNC and replication offsets. */
227790a81b4eSantirez     if (server.masterhost && server.master &&
227832f80e2fSantirez         !(server.master->flags & CLIENT_PRE_PSYNC))
2279e06a5604Santirez         replicationSendAck();
228089a1433eSantirez 
228189a1433eSantirez     /* If we have attached slaves, PING them from time to time.
228289a1433eSantirez      * So slaves can implement an explicit timeout to masters, and will
228389a1433eSantirez      * be able to detect a link disconnection even if the TCP connection
228489a1433eSantirez      * will not actually go down. */
228589a1433eSantirez     listIter li;
228689a1433eSantirez     listNode *ln;
22874b83ad4eSantirez     robj *ping_argv[1];
228889a1433eSantirez 
228955ba7727Santirez     /* First, send PING according to ping_slave_period. */
229055ba7727Santirez     if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
22914b83ad4eSantirez         ping_argv[0] = createStringObject("PING",4);
229255ba7727Santirez         replicationFeedSlaves(server.slaves, server.slaveseldb,
229355ba7727Santirez             ping_argv, 1);
22944b83ad4eSantirez         decrRefCount(ping_argv[0]);
229555ba7727Santirez     }
22964b83ad4eSantirez 
229707888202Santirez     /* Second, send a newline to all the slaves in pre-synchronization
229807888202Santirez      * stage, that is, slaves waiting for the master to create the RDB file.
22994b83ad4eSantirez      * The newline will be ignored by the slave but will refresh the
230055ba7727Santirez      * last-io timer preventing a timeout. In this case we ignore the
230155ba7727Santirez      * ping period and refresh the connection once per second since certain
230255ba7727Santirez      * timeouts are set at a few seconds (example: PSYNC response). */
230389a1433eSantirez     listRewind(server.slaves,&li);
230489a1433eSantirez     while((ln = listNext(&li))) {
2305554bd0e7Santirez         client *slave = ln->value;
230689a1433eSantirez 
230732f80e2fSantirez         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
230832f80e2fSantirez             (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
230932f80e2fSantirez              server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
23104b16263bSantirez         {
2311f96a9f82Santirez             if (write(slave->fd, "\n", 1) == -1) {
2312f96a9f82Santirez                 /* Don't worry, it's just a ping. */
2313f96a9f82Santirez             }
231489a1433eSantirez         }
231589a1433eSantirez     }
231607888202Santirez 
23173c82c85fSantirez     /* Disconnect timedout slaves. */
23183c82c85fSantirez     if (listLength(server.slaves)) {
23193c82c85fSantirez         listIter li;
23203c82c85fSantirez         listNode *ln;
23213c82c85fSantirez 
23223c82c85fSantirez         listRewind(server.slaves,&li);
23233c82c85fSantirez         while((ln = listNext(&li))) {
2324554bd0e7Santirez             client *slave = ln->value;
23253c82c85fSantirez 
232632f80e2fSantirez             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
232732f80e2fSantirez             if (slave->flags & CLIENT_PRE_PSYNC) continue;
23283c82c85fSantirez             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
23293c82c85fSantirez             {
233032f80e2fSantirez                 serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
23318a416ca4Santirez                     replicationGetSlaveName(slave));
2332f5c6ebbfSantirez                 freeClient(slave);
23333c82c85fSantirez             }
23343c82c85fSantirez         }
23353c82c85fSantirez     }
23363c82c85fSantirez 
233707888202Santirez     /* If we have no attached slaves and there is a replication backlog
233807888202Santirez      * using memory, free it after some (configured) time. */
233907888202Santirez     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
234007888202Santirez         server.repl_backlog)
234107888202Santirez     {
234207888202Santirez         time_t idle = server.unixtime - server.repl_no_slaves_since;
234307888202Santirez 
234407888202Santirez         if (idle > server.repl_backlog_time_limit) {
234507888202Santirez             freeReplicationBacklog();
234632f80e2fSantirez             serverLog(LL_NOTICE,
234707888202Santirez                 "Replication backlog freed after %d seconds "
2348f9b5ca29Santirez                 "without connected slaves.",
2349f9b5ca29Santirez                 (int) server.repl_backlog_time_limit);
235007888202Santirez         }
235107888202Santirez     }
2352ed599d3aSantirez 
2353f0bf5fd8Santirez     /* If AOF is disabled and we no longer have attached slaves, we can
2354f0bf5fd8Santirez      * free our Replication Script Cache as there is no need to propagate
2355f0bf5fd8Santirez      * EVALSHA at all. */
2356f0bf5fd8Santirez     if (listLength(server.slaves) == 0 &&
235732f80e2fSantirez         server.aof_state == AOF_OFF &&
2358f0bf5fd8Santirez         listLength(server.repl_scriptcache_fifo) != 0)
2359f0bf5fd8Santirez     {
2360f0bf5fd8Santirez         replicationScriptCacheFlush();
2361f0bf5fd8Santirez     }
2362f0bf5fd8Santirez 
2363017378ecSantirez     /* Start a BGSAVE good for replication if we have slaves in
2364017378ecSantirez      * WAIT_BGSAVE_START state.
236542951ab3Santirez      *
2366017378ecSantirez      * In case of diskless replication, we make sure to wait the specified
2367017378ecSantirez      * number of seconds (according to configuration) so that other slaves
2368017378ecSantirez      * have the time to arrive before we start streaming. */
236942951ab3Santirez     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2370e9e00755Santirez         time_t idle, max_idle = 0;
2371e9e00755Santirez         int slaves_waiting = 0;
23723e6d4d59Santirez         int mincapa = -1;
2373e9e00755Santirez         listNode *ln;
2374e9e00755Santirez         listIter li;
2375e9e00755Santirez 
2376e9e00755Santirez         listRewind(server.slaves,&li);
2377e9e00755Santirez         while((ln = listNext(&li))) {
2378554bd0e7Santirez             client *slave = ln->value;
237932f80e2fSantirez             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2380e9e00755Santirez                 idle = server.unixtime - slave->lastinteraction;
2381e9e00755Santirez                 if (idle > max_idle) max_idle = idle;
2382e9e00755Santirez                 slaves_waiting++;
23833e6d4d59Santirez                 mincapa = (mincapa == -1) ? slave->slave_capa :
23843e6d4d59Santirez                                             (mincapa & slave->slave_capa);
2385e9e00755Santirez             }
2386e9e00755Santirez         }
2387e9e00755Santirez 
2388017378ecSantirez         if (slaves_waiting &&
2389017378ecSantirez             (!server.repl_diskless_sync ||
2390017378ecSantirez              max_idle > server.repl_diskless_sync_delay))
2391017378ecSantirez         {
2392017378ecSantirez             /* Start the BGSAVE. The called function may start a
2393017378ecSantirez              * BGSAVE with socket target or disk target depending on the
2394017378ecSantirez              * configuration and slaves capabilities. */
2395f18e5b63Santirez             startBgsaveForReplication(mincapa);
2396e9e00755Santirez         }
2397e9e00755Santirez     }
2398e9e00755Santirez 
2399ed599d3aSantirez     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2400ed599d3aSantirez     refreshGoodSlavesCount();
240155ba7727Santirez     replication_cron_loops++; /* Incremented with frequency 1 HZ. */
2402f4aa600bSantirez }
2403