14365e5b2Santirez /* Asynchronous replication implementation.
24365e5b2Santirez *
34365e5b2Santirez * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
44365e5b2Santirez * All rights reserved.
54365e5b2Santirez *
64365e5b2Santirez * Redistribution and use in source and binary forms, with or without
74365e5b2Santirez * modification, are permitted provided that the following conditions are met:
84365e5b2Santirez *
94365e5b2Santirez * * Redistributions of source code must retain the above copyright notice,
104365e5b2Santirez * this list of conditions and the following disclaimer.
114365e5b2Santirez * * Redistributions in binary form must reproduce the above copyright
124365e5b2Santirez * notice, this list of conditions and the following disclaimer in the
134365e5b2Santirez * documentation and/or other materials provided with the distribution.
144365e5b2Santirez * * Neither the name of Redis nor the names of its contributors may be used
154365e5b2Santirez * to endorse or promote products derived from this software without
164365e5b2Santirez * specific prior written permission.
174365e5b2Santirez *
184365e5b2Santirez * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
194365e5b2Santirez * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
204365e5b2Santirez * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
214365e5b2Santirez * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
224365e5b2Santirez * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
234365e5b2Santirez * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
244365e5b2Santirez * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
254365e5b2Santirez * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
264365e5b2Santirez * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
274365e5b2Santirez * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
284365e5b2Santirez * POSSIBILITY OF SUCH DAMAGE.
294365e5b2Santirez */
304365e5b2Santirez
314365e5b2Santirez
32cef054e8Santirez #include "server.h"
33e2641e09Santirez
34e2641e09Santirez #include <sys/time.h>
35e2641e09Santirez #include <unistd.h>
36e2641e09Santirez #include <fcntl.h>
37d310fbedSantirez #include <sys/socket.h>
38e2641e09Santirez #include <sys/stat.h>
39e2641e09Santirez
4007888202Santirez void replicationDiscardCachedMaster(void);
4107888202Santirez void replicationResurrectCachedMaster(int newfd);
42c5618e7fSantirez void replicationSendAck(void);
43554bd0e7Santirez void putSlaveOnline(client *slave);
44c5f8c80aSantirez int cancelReplicationHandshake(void);
4507888202Santirez
468a416ca4Santirez /* --------------------------- Utility functions ---------------------------- */
478a416ca4Santirez
488a416ca4Santirez /* Return the pointer to a string representing the slave ip:listening_port
498a416ca4Santirez * pair. Mostly useful for logging, since we want to log a slave using its
500a45fbc3Santirez * IP address and its listening port which is more clear for the user, for
518a416ca4Santirez * example: "Closing connection with slave 10.1.2.3:6380". */
replicationGetSlaveName(client * c)52554bd0e7Santirez char *replicationGetSlaveName(client *c) {
5332f80e2fSantirez static char buf[NET_PEER_ID_LEN];
5432f80e2fSantirez char ip[NET_IP_STR_LEN];
558a416ca4Santirez
568a416ca4Santirez ip[0] = '\0';
578a416ca4Santirez buf[0] = '\0';
580a45fbc3Santirez if (c->slave_ip[0] != '\0' ||
590a45fbc3Santirez anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
600a45fbc3Santirez {
610a45fbc3Santirez /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
620a45fbc3Santirez if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
630a45fbc3Santirez
648a416ca4Santirez if (c->slave_listening_port)
65ce269ad3Santirez anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
668a416ca4Santirez else
678a416ca4Santirez snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
688a416ca4Santirez } else {
698a416ca4Santirez snprintf(buf,sizeof(buf),"client id #%llu",
708a416ca4Santirez (unsigned long long) c->id);
718a416ca4Santirez }
728a416ca4Santirez return buf;
738a416ca4Santirez }
748a416ca4Santirez
75f4aa600bSantirez /* ---------------------------------- MASTER -------------------------------- */
76f4aa600bSantirez
createReplicationBacklog(void)7707888202Santirez void createReplicationBacklog(void) {
782d9e3eb1Santirez serverAssert(server.repl_backlog == NULL);
7907888202Santirez server.repl_backlog = zmalloc(server.repl_backlog_size);
8007888202Santirez server.repl_backlog_histlen = 0;
8107888202Santirez server.repl_backlog_idx = 0;
8207888202Santirez /* When a new backlog buffer is created, we increment the replication
8307888202Santirez * offset by one to make sure we'll not be able to PSYNC with any
8407888202Santirez * previous slave. This is needed because we avoid incrementing the
8507888202Santirez * master_repl_offset if no backlog exists nor slaves are attached. */
8607888202Santirez server.master_repl_offset++;
8707888202Santirez
8807888202Santirez /* We don't have any data inside our buffer, but virtually the first
8907888202Santirez * byte we have is the next byte that will be generated for the
9007888202Santirez * replication stream. */
9107888202Santirez server.repl_backlog_off = server.master_repl_offset+1;
9207888202Santirez }
9307888202Santirez
9407888202Santirez /* This function is called when the user modifies the replication backlog
9507888202Santirez * size at runtime. It is up to the function to both update the
9607888202Santirez * server.repl_backlog_size and to resize the buffer and setup it so that
9707888202Santirez * it contains the same data as the previous one (possibly less data, but
9807888202Santirez * the most recent bytes, or the same data and more free space in case the
9907888202Santirez * buffer is enlarged). */
resizeReplicationBacklog(long long newsize)10007888202Santirez void resizeReplicationBacklog(long long newsize) {
10132f80e2fSantirez if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
10232f80e2fSantirez newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
10307888202Santirez if (server.repl_backlog_size == newsize) return;
10407888202Santirez
10507888202Santirez server.repl_backlog_size = newsize;
10607888202Santirez if (server.repl_backlog != NULL) {
10707888202Santirez /* What we actually do is to flush the old buffer and realloc a new
10807888202Santirez * empty one. It will refill with new data incrementally.
10907888202Santirez * The reason is that copying a few gigabytes adds latency and even
11007888202Santirez * worse often we need to alloc additional space before freeing the
11107888202Santirez * old buffer. */
11207888202Santirez zfree(server.repl_backlog);
11307888202Santirez server.repl_backlog = zmalloc(server.repl_backlog_size);
11407888202Santirez server.repl_backlog_histlen = 0;
11507888202Santirez server.repl_backlog_idx = 0;
1163a82b8acSAaron Rutkovsky /* Next byte we have is... the next since the buffer is empty. */
11707888202Santirez server.repl_backlog_off = server.master_repl_offset+1;
11807888202Santirez }
11907888202Santirez }
12007888202Santirez
freeReplicationBacklog(void)12107888202Santirez void freeReplicationBacklog(void) {
1222d9e3eb1Santirez serverAssert(listLength(server.slaves) == 0);
12307888202Santirez zfree(server.repl_backlog);
12407888202Santirez server.repl_backlog = NULL;
12507888202Santirez }
12607888202Santirez
12707888202Santirez /* Add data to the replication backlog.
12807888202Santirez * This function also increments the global replication offset stored at
12907888202Santirez * server.master_repl_offset, because there is no case where we want to feed
13007888202Santirez * the backlog without incrementing the buffer. */
feedReplicationBacklog(void * ptr,size_t len)13107888202Santirez void feedReplicationBacklog(void *ptr, size_t len) {
13207888202Santirez unsigned char *p = ptr;
13307888202Santirez
13407888202Santirez server.master_repl_offset += len;
13507888202Santirez
13607888202Santirez /* This is a circular buffer, so write as much data we can at every
13707888202Santirez * iteration and rewind the "idx" index if we reach the limit. */
13807888202Santirez while(len) {
13907888202Santirez size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
14007888202Santirez if (thislen > len) thislen = len;
14107888202Santirez memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
14207888202Santirez server.repl_backlog_idx += thislen;
14307888202Santirez if (server.repl_backlog_idx == server.repl_backlog_size)
14407888202Santirez server.repl_backlog_idx = 0;
14507888202Santirez len -= thislen;
14607888202Santirez p += thislen;
14707888202Santirez server.repl_backlog_histlen += thislen;
14807888202Santirez }
14907888202Santirez if (server.repl_backlog_histlen > server.repl_backlog_size)
15007888202Santirez server.repl_backlog_histlen = server.repl_backlog_size;
15107888202Santirez /* Set the offset of the first byte we have in the backlog. */
15207888202Santirez server.repl_backlog_off = server.master_repl_offset -
15307888202Santirez server.repl_backlog_histlen + 1;
15407888202Santirez }
15507888202Santirez
15607888202Santirez /* Wrapper for feedReplicationBacklog() that takes Redis string objects
15707888202Santirez * as input. */
feedReplicationBacklogWithObject(robj * o)15807888202Santirez void feedReplicationBacklogWithObject(robj *o) {
15932f80e2fSantirez char llstr[LONG_STR_SIZE];
16007888202Santirez void *p;
16107888202Santirez size_t len;
16207888202Santirez
16314ff5724Santirez if (o->encoding == OBJ_ENCODING_INT) {
16407888202Santirez len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
16507888202Santirez p = llstr;
16607888202Santirez } else {
16707888202Santirez len = sdslen(o->ptr);
16807888202Santirez p = o->ptr;
16907888202Santirez }
17007888202Santirez feedReplicationBacklog(p,len);
17107888202Santirez }
17207888202Santirez
replicationFeedSlaves(list * slaves,int dictid,robj ** argv,int argc)173e2641e09Santirez void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
174e2641e09Santirez listNode *ln;
175e2641e09Santirez listIter li;
176dcc48a81Santirez int j, len;
17732f80e2fSantirez char llstr[LONG_STR_SIZE];
178e2641e09Santirez
17907888202Santirez /* If there aren't slaves, and there is no backlog buffer to populate,
18007888202Santirez * we can return ASAP. */
18107888202Santirez if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
18207888202Santirez
18307888202Santirez /* We can't have slaves attached and no backlog. */
1842d9e3eb1Santirez serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
18507888202Santirez
186dcc48a81Santirez /* Send SELECT command to every slave if needed. */
18707888202Santirez if (server.slaveseldb != dictid) {
188dcc48a81Santirez robj *selectcmd;
18907888202Santirez
190dcc48a81Santirez /* For a few DBs we have pre-computed SELECT command. */
19132f80e2fSantirez if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
192dcc48a81Santirez selectcmd = shared.select[dictid];
19307888202Santirez } else {
19407888202Santirez int dictid_len;
19507888202Santirez
19607888202Santirez dictid_len = ll2string(llstr,sizeof(llstr),dictid);
19714ff5724Santirez selectcmd = createObject(OBJ_STRING,
198dcc48a81Santirez sdscatprintf(sdsempty(),
199dcc48a81Santirez "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
200dcc48a81Santirez dictid_len, llstr));
20107888202Santirez }
202dcc48a81Santirez
203dcc48a81Santirez /* Add the SELECT command into the backlog. */
204dcc48a81Santirez if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
205dcc48a81Santirez
206dcc48a81Santirez /* Send it to slaves. */
207dcc48a81Santirez listRewind(slaves,&li);
208dcc48a81Santirez while((ln = listNext(&li))) {
209554bd0e7Santirez client *slave = ln->value;
210a5a06a8eSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
211dcc48a81Santirez addReply(slave,selectcmd);
212dcc48a81Santirez }
213dcc48a81Santirez
21432f80e2fSantirez if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
215dcc48a81Santirez decrRefCount(selectcmd);
21607888202Santirez }
21707888202Santirez server.slaveseldb = dictid;
21807888202Santirez
219dcc48a81Santirez /* Write the command to the replication backlog if any. */
22007888202Santirez if (server.repl_backlog) {
22132f80e2fSantirez char aux[LONG_STR_SIZE+3];
222dcc48a81Santirez
223dcc48a81Santirez /* Add the multi bulk reply length. */
224dcc48a81Santirez aux[0] = '*';
22570e82e5cSMaxim Zakharov len = ll2string(aux+1,sizeof(aux)-1,argc);
226dcc48a81Santirez aux[len+1] = '\r';
227dcc48a81Santirez aux[len+2] = '\n';
228dcc48a81Santirez feedReplicationBacklog(aux,len+3);
229dcc48a81Santirez
230dcc48a81Santirez for (j = 0; j < argc; j++) {
231dcc48a81Santirez long objlen = stringObjectLen(argv[j]);
23207888202Santirez
23307888202Santirez /* We need to feed the buffer with the object as a bulk reply
23407888202Santirez * not just as a plain string, so create the $..CRLF payload len
2359f98b29cSJan-Erik Rediger * and add the final CRLF */
23607888202Santirez aux[0] = '$';
237dcc48a81Santirez len = ll2string(aux+1,sizeof(aux)-1,objlen);
23807888202Santirez aux[len+1] = '\r';
23907888202Santirez aux[len+2] = '\n';
24007888202Santirez feedReplicationBacklog(aux,len+3);
241dcc48a81Santirez feedReplicationBacklogWithObject(argv[j]);
242c06de115Santirez feedReplicationBacklog(aux+len+1,2);
24307888202Santirez }
24407888202Santirez }
24507888202Santirez
246dcc48a81Santirez /* Write the command to every slave. */
247e9e00755Santirez listRewind(server.slaves,&li);
248e2641e09Santirez while((ln = listNext(&li))) {
249554bd0e7Santirez client *slave = ln->value;
250e2641e09Santirez
251e2641e09Santirez /* Don't feed slaves that are still waiting for BGSAVE to start */
25232f80e2fSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
253e2641e09Santirez
254632e4c09SPieter Noordhuis /* Feed slaves that are waiting for the initial SYNC (so these commands
2559d09ce39Sguiquanz * are queued in the output buffer until the initial SYNC completes),
256632e4c09SPieter Noordhuis * or are already in sync with the master. */
257e2641e09Santirez
258dcc48a81Santirez /* Add the multi bulk length. */
259dcc48a81Santirez addReplyMultiBulkLen(slave,argc);
260e34a35a5Santirez
26107888202Santirez /* Finally any additional argument that was not stored inside the
26207888202Santirez * static buffer if any (from j to argc). */
263dcc48a81Santirez for (j = 0; j < argc; j++)
264dcc48a81Santirez addReplyBulk(slave,argv[j]);
265e2641e09Santirez }
266e2641e09Santirez }
267e2641e09Santirez
replicationFeedMonitors(client * c,list * monitors,int dictid,robj ** argv,int argc)268554bd0e7Santirez void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
269e2641e09Santirez listNode *ln;
270e2641e09Santirez listIter li;
271d1cbad6dSantirez int j;
272e2641e09Santirez sds cmdrepr = sdsnew("+");
273e2641e09Santirez robj *cmdobj;
274e2641e09Santirez struct timeval tv;
275e2641e09Santirez
276e2641e09Santirez gettimeofday(&tv,NULL);
2772b2eca1fSPieter Noordhuis cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
27832f80e2fSantirez if (c->flags & CLIENT_LUA) {
279e31b615eSantirez cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
28032f80e2fSantirez } else if (c->flags & CLIENT_UNIX_SOCKET) {
2812ea41242Santirez cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
282e31b615eSantirez } else {
2830bcc7cb4Santirez cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
284e31b615eSantirez }
285e2641e09Santirez
286e2641e09Santirez for (j = 0; j < argc; j++) {
28714ff5724Santirez if (argv[j]->encoding == OBJ_ENCODING_INT) {
288d3b958c3Santirez cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
289e2641e09Santirez } else {
290e2641e09Santirez cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
291e2641e09Santirez sdslen(argv[j]->ptr));
292e2641e09Santirez }
293e2641e09Santirez if (j != argc-1)
294e2641e09Santirez cmdrepr = sdscatlen(cmdrepr," ",1);
295e2641e09Santirez }
296e2641e09Santirez cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
29714ff5724Santirez cmdobj = createObject(OBJ_STRING,cmdrepr);
298e2641e09Santirez
299e2641e09Santirez listRewind(monitors,&li);
300e2641e09Santirez while((ln = listNext(&li))) {
301554bd0e7Santirez client *monitor = ln->value;
302e2641e09Santirez addReply(monitor,cmdobj);
303e2641e09Santirez }
304e2641e09Santirez decrRefCount(cmdobj);
305e2641e09Santirez }
306e2641e09Santirez
30707888202Santirez /* Feed the slave 'c' with the replication backlog starting from the
30807888202Santirez * specified 'offset' up to the end of the backlog. */
addReplyReplicationBacklog(client * c,long long offset)309554bd0e7Santirez long long addReplyReplicationBacklog(client *c, long long offset) {
31007888202Santirez long long j, skip, len;
31107888202Santirez
31232f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
31307888202Santirez
31407888202Santirez if (server.repl_backlog_histlen == 0) {
31532f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
31607888202Santirez return 0;
31707888202Santirez }
31807888202Santirez
31932f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
3203af478e9Santirez server.repl_backlog_size);
32132f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
3223af478e9Santirez server.repl_backlog_off);
32332f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
3243af478e9Santirez server.repl_backlog_histlen);
32532f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
3263af478e9Santirez server.repl_backlog_idx);
32707888202Santirez
32807888202Santirez /* Compute the amount of bytes we need to discard. */
32907888202Santirez skip = offset - server.repl_backlog_off;
33032f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
33107888202Santirez
33207888202Santirez /* Point j to the oldest byte, that is actaully our
33307888202Santirez * server.repl_backlog_off byte. */
33407888202Santirez j = (server.repl_backlog_idx +
33507888202Santirez (server.repl_backlog_size-server.repl_backlog_histlen)) %
33607888202Santirez server.repl_backlog_size;
33732f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
33807888202Santirez
33907888202Santirez /* Discard the amount of data to seek to the specified 'offset'. */
34007888202Santirez j = (j + skip) % server.repl_backlog_size;
34107888202Santirez
34207888202Santirez /* Feed slave with data. Since it is a circular buffer we have to
34307888202Santirez * split the reply in two parts if we are cross-boundary. */
34407888202Santirez len = server.repl_backlog_histlen - skip;
34532f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
34607888202Santirez while(len) {
34707888202Santirez long long thislen =
34807888202Santirez ((server.repl_backlog_size - j) < len) ?
34907888202Santirez (server.repl_backlog_size - j) : len;
35007888202Santirez
35132f80e2fSantirez serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
35207888202Santirez addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
35307888202Santirez len -= thislen;
35407888202Santirez j = 0;
35507888202Santirez }
35607888202Santirez return server.repl_backlog_histlen - skip;
35707888202Santirez }
35807888202Santirez
359292fec05Santirez /* Return the offset to provide as reply to the PSYNC command received
360292fec05Santirez * from the slave. The returned value is only valid immediately after
361292fec05Santirez * the BGSAVE process started and before executing any other command
362292fec05Santirez * from clients. */
getPsyncInitialOffset(void)363292fec05Santirez long long getPsyncInitialOffset(void) {
364292fec05Santirez long long psync_offset = server.master_repl_offset;
365292fec05Santirez /* Add 1 to psync_offset if it the replication backlog does not exists
366292fec05Santirez * as when it will be created later we'll increment the offset by one. */
367292fec05Santirez if (server.repl_backlog == NULL) psync_offset++;
368292fec05Santirez return psync_offset;
369292fec05Santirez }
370292fec05Santirez
37115de6b10Santirez /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
37215de6b10Santirez * as a side effect setup the slave for a full sync in different ways:
37315de6b10Santirez *
37415de6b10Santirez * 1) Remember, into the slave client structure, the offset we sent
37515de6b10Santirez * here, so that if new slaves will later attach to the same
376292fec05Santirez * background RDB saving process (by duplicating this client output
37715de6b10Santirez * buffer), we can get the right offset from this slave.
37815de6b10Santirez * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
37915de6b10Santirez * we start accumulating differences from this point.
38015de6b10Santirez * 3) Force the replication stream to re-emit a SELECT statement so
38115de6b10Santirez * the new slave incremental differences will start selecting the
38215de6b10Santirez * right database number.
383f18e5b63Santirez *
384f18e5b63Santirez * Normally this function should be called immediately after a successful
385f18e5b63Santirez * BGSAVE for replication was started, or when there is one already in
386f18e5b63Santirez * progress that we attached our slave to. */
replicationSetupSlaveForFullResync(client * slave,long long offset)38715de6b10Santirez int replicationSetupSlaveForFullResync(client *slave, long long offset) {
388292fec05Santirez char buf[128];
389292fec05Santirez int buflen;
390292fec05Santirez
391292fec05Santirez slave->psync_initial_offset = offset;
39215de6b10Santirez slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
39315de6b10Santirez /* We are going to accumulate the incremental changes for this
39415de6b10Santirez * slave as well. Set slaveseldb to -1 in order to force to re-emit
39515de6b10Santirez * a SLEECT statement in the replication stream. */
39615de6b10Santirez server.slaveseldb = -1;
39715de6b10Santirez
398292fec05Santirez /* Don't send this reply to slaves that approached us with
399292fec05Santirez * the old SYNC command. */
400292fec05Santirez if (!(slave->flags & CLIENT_PRE_PSYNC)) {
401292fec05Santirez buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
402292fec05Santirez server.runid,offset);
403292fec05Santirez if (write(slave->fd,buf,buflen) != buflen) {
404292fec05Santirez freeClientAsync(slave);
405292fec05Santirez return C_ERR;
406292fec05Santirez }
407292fec05Santirez }
408292fec05Santirez return C_OK;
409292fec05Santirez }
410292fec05Santirez
41107888202Santirez /* This function handles the PSYNC command from the point of view of a
41207888202Santirez * master receiving a request for partial resynchronization.
41307888202Santirez *
41440eb548aSantirez * On success return C_OK, otherwise C_ERR is returned and we proceed
41507888202Santirez * with the usual full resync. */
masterTryPartialResynchronization(client * c)416554bd0e7Santirez int masterTryPartialResynchronization(client *c) {
41707888202Santirez long long psync_offset, psync_len;
41807888202Santirez char *master_runid = c->argv[1]->ptr;
4190ed6daa4Santirez char buf[128];
4200ed6daa4Santirez int buflen;
42107888202Santirez
42207888202Santirez /* Is the runid of this master the same advertised by the wannabe slave
42307888202Santirez * via PSYNC? If runid changed this master is a different instance and
42407888202Santirez * there is no way to continue. */
42507888202Santirez if (strcasecmp(master_runid, server.runid)) {
42607888202Santirez /* Run id "?" is used by slaves that want to force a full resync. */
42707888202Santirez if (master_runid[0] != '?') {
42832f80e2fSantirez serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
429707ff0f7Santirez "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
43007888202Santirez master_runid, server.runid);
43107888202Santirez } else {
43232f80e2fSantirez serverLog(LL_NOTICE,"Full resync requested by slave %s",
4334b8f4b90Santirez replicationGetSlaveName(c));
43407888202Santirez }
43507888202Santirez goto need_full_resync;
43607888202Santirez }
43707888202Santirez
43807888202Santirez /* We still have the data our slave is asking for? */
43907888202Santirez if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
44040eb548aSantirez C_OK) goto need_full_resync;
44107888202Santirez if (!server.repl_backlog ||
44207888202Santirez psync_offset < server.repl_backlog_off ||
44337e06bd9Santirez psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
44407888202Santirez {
44532f80e2fSantirez serverLog(LL_NOTICE,
4464b8f4b90Santirez "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
44737e06bd9Santirez if (psync_offset > server.master_repl_offset) {
44832f80e2fSantirez serverLog(LL_WARNING,
4494b8f4b90Santirez "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
45037e06bd9Santirez }
45107888202Santirez goto need_full_resync;
45207888202Santirez }
45307888202Santirez
45407888202Santirez /* If we reached this point, we are able to perform a partial resync:
45507888202Santirez * 1) Set client state to make it a slave.
45607888202Santirez * 2) Inform the client we can continue with +CONTINUE
45707888202Santirez * 3) Send the backlog data (from the offset to the end) to the slave. */
45832f80e2fSantirez c->flags |= CLIENT_SLAVE;
45932f80e2fSantirez c->replstate = SLAVE_STATE_ONLINE;
4603c82c85fSantirez c->repl_ack_time = server.unixtime;
461bb7fea0dSantirez c->repl_put_online_on_ack = 0;
46207888202Santirez listAddNodeTail(server.slaves,c);
4630ed6daa4Santirez /* We can't use the connection buffers since they are used to accumulate
4640ed6daa4Santirez * new commands at this stage. But we are sure the socket send buffer is
4653a82b8acSAaron Rutkovsky * empty so this write will never fail actually. */
4660ed6daa4Santirez buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
4670ed6daa4Santirez if (write(c->fd,buf,buflen) != buflen) {
4680ed6daa4Santirez freeClientAsync(c);
46940eb548aSantirez return C_OK;
4700ed6daa4Santirez }
47107888202Santirez psync_len = addReplyReplicationBacklog(c,psync_offset);
47232f80e2fSantirez serverLog(LL_NOTICE,
4734b8f4b90Santirez "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
4744b8f4b90Santirez replicationGetSlaveName(c),
4754b8f4b90Santirez psync_len, psync_offset);
47607888202Santirez /* Note that we don't need to set the selected DB at server.slaveseldb
47707888202Santirez * to -1 to force the master to emit SELECT, since the slave already
47807888202Santirez * has this state from the previous connection with the master. */
47907888202Santirez
4801a54d596Santirez refreshGoodSlavesCount();
48140eb548aSantirez return C_OK; /* The caller can return, no full resync needed. */
48207888202Santirez
48307888202Santirez need_full_resync:
484292fec05Santirez /* We need a full resync for some reason... Note that we can't
485292fec05Santirez * reply to PSYNC right now if a full SYNC is needed. The reply
486292fec05Santirez * must include the master offset at the time the RDB file we transfer
487292fec05Santirez * is generated, so we need to delay the reply to that moment. */
48840eb548aSantirez return C_ERR;
48907888202Santirez }
49007888202Santirez
49175f0cd65Santirez /* Start a BGSAVE for replication goals, which is, selecting the disk or
49275f0cd65Santirez * socket target depending on the configuration, and making sure that
49375f0cd65Santirez * the script cache is flushed before to start.
49475f0cd65Santirez *
4953e6d4d59Santirez * The mincapa argument is the bitwise AND among all the slaves capabilities
4963e6d4d59Santirez * of the slaves waiting for this BGSAVE, so represents the slave capabilities
4973e6d4d59Santirez * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
4983e6d4d59Santirez *
499f18e5b63Santirez * Side effects, other than starting a BGSAVE:
500f18e5b63Santirez *
501f18e5b63Santirez * 1) Handle the slaves in WAIT_START state, by preparing them for a full
502f18e5b63Santirez * sync if the BGSAVE was succesfully started, or sending them an error
503f18e5b63Santirez * and dropping them from the list of slaves.
504f18e5b63Santirez *
505f18e5b63Santirez * 2) Flush the Lua scripting script cache if the BGSAVE was actually
506f18e5b63Santirez * started.
507f18e5b63Santirez *
50840eb548aSantirez * Returns C_OK on success or C_ERR otherwise. */
startBgsaveForReplication(int mincapa)5093e6d4d59Santirez int startBgsaveForReplication(int mincapa) {
51075f0cd65Santirez int retval;
511ce5761e0Santirez int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
512f18e5b63Santirez listIter li;
513f18e5b63Santirez listNode *ln;
51475f0cd65Santirez
51532f80e2fSantirez serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
516ce5761e0Santirez socket_target ? "slaves sockets" : "disk");
51775f0cd65Santirez
518ce5761e0Santirez if (socket_target)
51975f0cd65Santirez retval = rdbSaveToSlavesSockets();
52075f0cd65Santirez else
52175f0cd65Santirez retval = rdbSaveBackground(server.rdb_filename);
52275f0cd65Santirez
523f18e5b63Santirez /* If we failed to BGSAVE, remove the slaves waiting for a full
524f18e5b63Santirez * resynchorinization from the list of salves, inform them with
525f18e5b63Santirez * an error about what happened, close the connection ASAP. */
526f18e5b63Santirez if (retval == C_ERR) {
527f18e5b63Santirez serverLog(LL_WARNING,"BGSAVE for replication failed");
528f18e5b63Santirez listRewind(server.slaves,&li);
529f18e5b63Santirez while((ln = listNext(&li))) {
530f18e5b63Santirez client *slave = ln->value;
531f18e5b63Santirez
532f18e5b63Santirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
533f18e5b63Santirez slave->flags &= ~CLIENT_SLAVE;
534f18e5b63Santirez listDelNode(server.slaves,ln);
535f18e5b63Santirez addReplyError(slave,
536f18e5b63Santirez "BGSAVE failed, replication can't continue");
537f18e5b63Santirez slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
538f18e5b63Santirez }
539f18e5b63Santirez }
540f18e5b63Santirez return retval;
541f18e5b63Santirez }
542f18e5b63Santirez
543f18e5b63Santirez /* If the target is socket, rdbSaveToSlavesSockets() already setup
544f18e5b63Santirez * the salves for a full resync. Otherwise for disk target do it now.*/
545f18e5b63Santirez if (!socket_target) {
546f18e5b63Santirez listRewind(server.slaves,&li);
547f18e5b63Santirez while((ln = listNext(&li))) {
548f18e5b63Santirez client *slave = ln->value;
549f18e5b63Santirez
550f18e5b63Santirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
551f18e5b63Santirez replicationSetupSlaveForFullResync(slave,
552f18e5b63Santirez getPsyncInitialOffset());
553f18e5b63Santirez }
554f18e5b63Santirez }
555f18e5b63Santirez }
556f18e5b63Santirez
55775f0cd65Santirez /* Flush the script cache, since we need that slave differences are
55875f0cd65Santirez * accumulated without requiring slaves to match our cached scripts. */
55940eb548aSantirez if (retval == C_OK) replicationScriptCacheFlush();
56075f0cd65Santirez return retval;
56175f0cd65Santirez }
56275f0cd65Santirez
5639f98b29cSJan-Erik Rediger /* SYNC and PSYNC command implemenation. */
syncCommand(client * c)564554bd0e7Santirez void syncCommand(client *c) {
5659d09ce39Sguiquanz /* ignore SYNC if already slave or in monitor mode */
56632f80e2fSantirez if (c->flags & CLIENT_SLAVE) return;
567e2641e09Santirez
568778b2210Santirez /* Refuse SYNC requests if we are a slave but the link with our master
569778b2210Santirez * is not ok... */
57032f80e2fSantirez if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
5713ab20376SPieter Noordhuis addReplyError(c,"Can't SYNC while not connected with my master");
572778b2210Santirez return;
573778b2210Santirez }
574778b2210Santirez
575e2641e09Santirez /* SYNC can't be issued when the server has pending data to send to
576e2641e09Santirez * the client about already issued commands. We need a fresh reply
577e2641e09Santirez * buffer registering the differences between the BGSAVE and the current
578e2641e09Santirez * dataset, so that we can copy to other slaves if needed. */
5792d21af45Santirez if (clientHasPendingReplies(c)) {
580d2a0348aSantirez addReplyError(c,"SYNC and PSYNC are invalid with pending output");
581e2641e09Santirez return;
582e2641e09Santirez }
583e2641e09Santirez
58432f80e2fSantirez serverLog(LL_NOTICE,"Slave %s asks for synchronization",
5854b8f4b90Santirez replicationGetSlaveName(c));
58607888202Santirez
58707888202Santirez /* Try a partial resynchronization if this is a PSYNC command.
58807888202Santirez * If it fails, we continue with usual full resynchronization, however
58907888202Santirez * when this happens masterTryPartialResynchronization() already
59007888202Santirez * replied with:
59107888202Santirez *
59207888202Santirez * +FULLRESYNC <runid> <offset>
59307888202Santirez *
59407888202Santirez * So the slave knows the new runid and offset to try a PSYNC later
59507888202Santirez * if the connection with the master is lost. */
59624f25836Santirez if (!strcasecmp(c->argv[0]->ptr,"psync")) {
59740eb548aSantirez if (masterTryPartialResynchronization(c) == C_OK) {
59824f25836Santirez server.stat_sync_partial_ok++;
59924f25836Santirez return; /* No full resync needed, return. */
60024f25836Santirez } else {
60124f25836Santirez char *master_runid = c->argv[1]->ptr;
60224f25836Santirez
60324f25836Santirez /* Increment stats for failed PSYNCs, but only if the
60424f25836Santirez * runid is not "?", as this is used by slaves to force a full
60524f25836Santirez * resync on purpose when they are not albe to partially
60624f25836Santirez * resync. */
60724f25836Santirez if (master_runid[0] != '?') server.stat_sync_partial_err++;
60824f25836Santirez }
6098ca265cdSantirez } else {
6108ca265cdSantirez /* If a slave uses SYNC, we are dealing with an old implementation
6118ca265cdSantirez * of the replication protocol (like redis-cli --slave). Flag the client
6128ca265cdSantirez * so that we don't expect to receive REPLCONF ACK feedbacks. */
61332f80e2fSantirez c->flags |= CLIENT_PRE_PSYNC;
61424f25836Santirez }
61524f25836Santirez
61624f25836Santirez /* Full resynchronization. */
61724f25836Santirez server.stat_sync_full++;
61807888202Santirez
619f18e5b63Santirez /* Setup the slave as one waiting for BGSAVE to start. The following code
620f18e5b63Santirez * paths will change the state if we handle the slave differently. */
621f18e5b63Santirez c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
622f18e5b63Santirez if (server.repl_disable_tcp_nodelay)
623f18e5b63Santirez anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
624f18e5b63Santirez c->repldbfd = -1;
625f18e5b63Santirez c->flags |= CLIENT_SLAVE;
626f18e5b63Santirez listAddNodeTail(server.slaves,c);
62762b5c60eSantirez
6283e6d4d59Santirez /* CASE 1: BGSAVE is in progress, with disk target. */
62975f0cd65Santirez if (server.rdb_child_pid != -1 &&
63032f80e2fSantirez server.rdb_child_type == RDB_CHILD_TYPE_DISK)
63175f0cd65Santirez {
632e2641e09Santirez /* Ok a background save is in progress. Let's check if it is a good
633e2641e09Santirez * one for replication, i.e. if there is another slave that is
63416546f5aSantirez * registering differences since the server forked to save. */
635554bd0e7Santirez client *slave;
636e2641e09Santirez listNode *ln;
637e2641e09Santirez listIter li;
638e2641e09Santirez
639e2641e09Santirez listRewind(server.slaves,&li);
640e2641e09Santirez while((ln = listNext(&li))) {
641e2641e09Santirez slave = ln->value;
64232f80e2fSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
643e2641e09Santirez }
6443e6d4d59Santirez /* To attach this slave, we check that it has at least all the
6453e6d4d59Santirez * capabilities of the slave that triggered the current BGSAVE. */
6463e6d4d59Santirez if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
647e2641e09Santirez /* Perfect, the server is already registering differences for
64875f0cd65Santirez * another slave. Set the right state, and copy the buffer. */
6491824e3a3Santirez copyClientOutputBuffer(c,slave);
65015de6b10Santirez replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
65132f80e2fSantirez serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
652e2641e09Santirez } else {
653e2641e09Santirez /* No way, we need to wait for the next BGSAVE in order to
65416546f5aSantirez * register differences. */
655017378ecSantirez serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
656e2641e09Santirez }
65762b5c60eSantirez
6583e6d4d59Santirez /* CASE 2: BGSAVE is in progress, with socket target. */
65975f0cd65Santirez } else if (server.rdb_child_pid != -1 &&
66032f80e2fSantirez server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
66175f0cd65Santirez {
66275f0cd65Santirez /* There is an RDB child process but it is writing directly to
66375f0cd65Santirez * children sockets. We need to wait for the next BGSAVE
66475f0cd65Santirez * in order to synchronize. */
665017378ecSantirez serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
66662b5c60eSantirez
66762b5c60eSantirez /* CASE 3: There is no BGSAVE is progress. */
66875f0cd65Santirez } else {
6693e6d4d59Santirez if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
67075f0cd65Santirez /* Diskless replication RDB child is created inside
67175f0cd65Santirez * replicationCron() since we want to delay its start a
67275f0cd65Santirez * few seconds to wait for more slaves to arrive. */
673a27befc4Santirez if (server.repl_diskless_sync_delay)
674a1bfe22aSantirez serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
675e2641e09Santirez } else {
6763e6d4d59Santirez /* Target is disk (or the slave is not capable of supporting
6773e6d4d59Santirez * diskless replication) and we don't have a BGSAVE in progress,
67862b5c60eSantirez * let's start one. */
679*e67ad1d1SQu Chen if (server.aof_child_pid == -1) {
680a1bfe22aSantirez startBgsaveForReplication(c->slave_capa);
681a1bfe22aSantirez } else {
682a1bfe22aSantirez serverLog(LL_NOTICE,
683a1bfe22aSantirez "No BGSAVE in progress, but an AOF rewrite is active. "
684a1bfe22aSantirez "BGSAVE for replication delayed");
685a1bfe22aSantirez }
68675f0cd65Santirez }
687e2641e09Santirez }
688b70b459bSantirez
68907888202Santirez if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
69007888202Santirez createReplicationBacklog();
691e2641e09Santirez return;
692e2641e09Santirez }
693e2641e09Santirez
6943a328978Santirez /* REPLCONF <option> <value> <option> <value> ...
6953a328978Santirez * This command is used by a slave in order to configure the replication
6963a328978Santirez * process before starting it with the SYNC command.
6973a328978Santirez *
6983a328978Santirez * Currently the only use of this command is to communicate to the master
6993a328978Santirez * what is the listening port of the Slave redis instance, so that the
7003a328978Santirez * master can accurately list slaves and their listening ports in
7013a328978Santirez * the INFO output.
7023a328978Santirez *
7033a328978Santirez * In the future the same command can be used in order to configure
7043a328978Santirez * the replication to initiate an incremental replication instead of a
7053a328978Santirez * full resync. */
replconfCommand(client * c)706554bd0e7Santirez void replconfCommand(client *c) {
7073a328978Santirez int j;
7083a328978Santirez
7093a328978Santirez if ((c->argc % 2) == 0) {
7103a328978Santirez /* Number of arguments must be odd to make sure that every
7113a328978Santirez * option has a corresponding value. */
7123a328978Santirez addReply(c,shared.syntaxerr);
7133a328978Santirez return;
7143a328978Santirez }
7153a328978Santirez
7163a328978Santirez /* Process every option-value pair. */
7173a328978Santirez for (j = 1; j < c->argc; j+=2) {
7183a328978Santirez if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
7193a328978Santirez long port;
7203a328978Santirez
7213a328978Santirez if ((getLongFromObjectOrReply(c,c->argv[j+1],
72240eb548aSantirez &port,NULL) != C_OK))
7233a328978Santirez return;
7243a328978Santirez c->slave_listening_port = port;
7250a45fbc3Santirez } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
7260a45fbc3Santirez sds ip = c->argv[j+1]->ptr;
7270a45fbc3Santirez if (sdslen(ip) < sizeof(c->slave_ip)) {
7280a45fbc3Santirez memcpy(c->slave_ip,ip,sdslen(ip)+1);
7290a45fbc3Santirez } else {
7300a45fbc3Santirez addReplyErrorFormat(c,"REPLCONF ip-address provided by "
7310a45fbc3Santirez "slave instance is too long: %zd bytes", sdslen(ip));
7320a45fbc3Santirez return;
7330a45fbc3Santirez }
7343e6d4d59Santirez } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
7353e6d4d59Santirez /* Ignore capabilities not understood by this master. */
7363e6d4d59Santirez if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
7373e6d4d59Santirez c->slave_capa |= SLAVE_CAPA_EOF;
7386b4635f4Santirez } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
7396b4635f4Santirez /* REPLCONF ACK is used by slave to inform the master the amount
7406b4635f4Santirez * of replication stream that it processed so far. It is an
7416b4635f4Santirez * internal only command that normal clients should never use. */
7426b4635f4Santirez long long offset;
7436b4635f4Santirez
74432f80e2fSantirez if (!(c->flags & CLIENT_SLAVE)) return;
74540eb548aSantirez if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
7466b4635f4Santirez return;
7476b4635f4Santirez if (offset > c->repl_ack_off)
7486b4635f4Santirez c->repl_ack_off = offset;
7496b4635f4Santirez c->repl_ack_time = server.unixtime;
750bb7fea0dSantirez /* If this was a diskless replication, we need to really put
751bb7fea0dSantirez * the slave online when the first ACK is received (which
752bb7fea0dSantirez * confirms slave is online and ready to get more data). */
75332f80e2fSantirez if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
754bb7fea0dSantirez putSlaveOnline(c);
7556b4635f4Santirez /* Note: this command does not reply anything! */
756dd0adbb7Santirez return;
757c5618e7fSantirez } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
758c5618e7fSantirez /* REPLCONF GETACK is used in order to request an ACK ASAP
759c5618e7fSantirez * to the slave. */
760c5618e7fSantirez if (server.masterhost && server.master) replicationSendAck();
761c5618e7fSantirez /* Note: this command does not reply anything! */
7623a328978Santirez } else {
7633a328978Santirez addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
7643a328978Santirez (char*)c->argv[j]->ptr);
7653a328978Santirez return;
7663a328978Santirez }
7673a328978Santirez }
7683a328978Santirez addReply(c,shared.ok);
7693a328978Santirez }
7703a328978Santirez
7713730d118Santirez /* This function puts a slave in the online state, and should be called just
7723730d118Santirez * after a slave received the RDB file for the initial synchronization, and
7733730d118Santirez * we are finally ready to send the incremental stream of commands.
7743730d118Santirez *
7753730d118Santirez * It does a few things:
7763730d118Santirez *
7776c60526dSantirez * 1) Put the slave in ONLINE state (useless when the function is called
7786c60526dSantirez * because state is already ONLINE but repl_put_online_on_ack is true).
7793730d118Santirez * 2) Make sure the writable event is re-installed, since calling the SYNC
7803730d118Santirez * command disables it, so that we can accumulate output buffer without
7813730d118Santirez * sending it to the slave.
7823730d118Santirez * 3) Update the count of good slaves. */
putSlaveOnline(client * slave)783554bd0e7Santirez void putSlaveOnline(client *slave) {
78432f80e2fSantirez slave->replstate = SLAVE_STATE_ONLINE;
785bb7fea0dSantirez slave->repl_put_online_on_ack = 0;
7866c60526dSantirez slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
7873730d118Santirez if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
7883730d118Santirez sendReplyToClient, slave) == AE_ERR) {
78932f80e2fSantirez serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
7903730d118Santirez freeClient(slave);
7913730d118Santirez return;
7923730d118Santirez }
7933730d118Santirez refreshGoodSlavesCount();
79432f80e2fSantirez serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
795bb7fea0dSantirez replicationGetSlaveName(slave));
7963730d118Santirez }
7973730d118Santirez
sendBulkToSlave(aeEventLoop * el,int fd,void * privdata,int mask)798e2641e09Santirez void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
799554bd0e7Santirez client *slave = privdata;
80032f80e2fSantirez UNUSED(el);
80132f80e2fSantirez UNUSED(mask);
80232f80e2fSantirez char buf[PROTO_IOBUF_LEN];
803e2641e09Santirez ssize_t nwritten, buflen;
804e2641e09Santirez
80589ffba91Santirez /* Before sending the RDB file, we send the preamble as configured by the
80689ffba91Santirez * replication process. Currently the preamble is just the bulk count of
80789ffba91Santirez * the file in the form "$<length>\r\n". */
80889ffba91Santirez if (slave->replpreamble) {
80989ffba91Santirez nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
81089ffba91Santirez if (nwritten == -1) {
81132f80e2fSantirez serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
81289ffba91Santirez strerror(errno));
813e2641e09Santirez freeClient(slave);
814e2641e09Santirez return;
815e2641e09Santirez }
8161b732c09Santirez server.stat_net_output_bytes += nwritten;
81789ffba91Santirez sdsrange(slave->replpreamble,nwritten,-1);
81889ffba91Santirez if (sdslen(slave->replpreamble) == 0) {
81989ffba91Santirez sdsfree(slave->replpreamble);
82089ffba91Santirez slave->replpreamble = NULL;
82189ffba91Santirez /* fall through sending data. */
82289ffba91Santirez } else {
82389ffba91Santirez return;
824e2641e09Santirez }
82589ffba91Santirez }
82689ffba91Santirez
82789ffba91Santirez /* If the preamble was already transfered, send the RDB bulk data. */
828e2641e09Santirez lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
82932f80e2fSantirez buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
830e2641e09Santirez if (buflen <= 0) {
83132f80e2fSantirez serverLog(LL_WARNING,"Read error sending DB to slave: %s",
832e2641e09Santirez (buflen == 0) ? "premature EOF" : strerror(errno));
833e2641e09Santirez freeClient(slave);
834e2641e09Santirez return;
835e2641e09Santirez }
836e2641e09Santirez if ((nwritten = write(fd,buf,buflen)) == -1) {
837970de3e9Santirez if (errno != EAGAIN) {
83832f80e2fSantirez serverLog(LL_WARNING,"Write error sending DB to slave: %s",
839e2641e09Santirez strerror(errno));
840e2641e09Santirez freeClient(slave);
841970de3e9Santirez }
842e2641e09Santirez return;
843e2641e09Santirez }
844e2641e09Santirez slave->repldboff += nwritten;
8451b732c09Santirez server.stat_net_output_bytes += nwritten;
846e2641e09Santirez if (slave->repldboff == slave->repldbsize) {
847e2641e09Santirez close(slave->repldbfd);
848e2641e09Santirez slave->repldbfd = -1;
849e2641e09Santirez aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
8503730d118Santirez putSlaveOnline(slave);
851e2641e09Santirez }
852e2641e09Santirez }
853e2641e09Santirez
85442951ab3Santirez /* This function is called at the end of every background saving,
85542951ab3Santirez * or when the replication RDB transfer strategy is modified from
85642951ab3Santirez * disk to socket or the other way around.
857e2641e09Santirez *
858e2641e09Santirez * The goal of this function is to handle slaves waiting for a successful
85975f0cd65Santirez * background saving in order to perform non-blocking synchronization, and
86075f0cd65Santirez * to schedule a new BGSAVE if there are slaves that attached while a
86175f0cd65Santirez * BGSAVE was in progress, but it was not a good one for replication (no
86242951ab3Santirez * other slave was accumulating differences).
86342951ab3Santirez *
86440eb548aSantirez * The argument bgsaveerr is C_OK if the background saving succeeded
86540eb548aSantirez * otherwise C_ERR is passed to the function.
86642951ab3Santirez * The 'type' argument is the type of the child that terminated
86742951ab3Santirez * (if it had a disk or socket target). */
updateSlavesWaitingBgsave(int bgsaveerr,int type)86875f0cd65Santirez void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
869e2641e09Santirez listNode *ln;
870e2641e09Santirez int startbgsave = 0;
8713e6d4d59Santirez int mincapa = -1;
872e2641e09Santirez listIter li;
873e2641e09Santirez
874e2641e09Santirez listRewind(server.slaves,&li);
875e2641e09Santirez while((ln = listNext(&li))) {
876554bd0e7Santirez client *slave = ln->value;
877e2641e09Santirez
87832f80e2fSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
879e2641e09Santirez startbgsave = 1;
8803e6d4d59Santirez mincapa = (mincapa == -1) ? slave->slave_capa :
8813e6d4d59Santirez (mincapa & slave->slave_capa);
88232f80e2fSantirez } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
883e2641e09Santirez struct redis_stat buf;
884e2641e09Santirez
8853730d118Santirez /* If this was an RDB on disk save, we have to prepare to send
8863730d118Santirez * the RDB from disk to the slave socket. Otherwise if this was
8873730d118Santirez * already an RDB -> Slaves socket transfer, used in the case of
8883730d118Santirez * diskless replication, our work is trivial, we can just put
8893730d118Santirez * the slave online. */
89032f80e2fSantirez if (type == RDB_CHILD_TYPE_SOCKET) {
89132f80e2fSantirez serverLog(LL_NOTICE,
892bb7fea0dSantirez "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
8934b8f4b90Santirez replicationGetSlaveName(slave));
894bb7fea0dSantirez /* Note: we wait for a REPLCONF ACK message from slave in
895bb7fea0dSantirez * order to really put it online (install the write handler
896bb7fea0dSantirez * so that the accumulated data can be transfered). However
897bb7fea0dSantirez * we change the replication state ASAP, since our slave
898bb7fea0dSantirez * is technically online now. */
89932f80e2fSantirez slave->replstate = SLAVE_STATE_ONLINE;
900bb7fea0dSantirez slave->repl_put_online_on_ack = 1;
9016c60526dSantirez slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
9023730d118Santirez } else {
90340eb548aSantirez if (bgsaveerr != C_OK) {
904e2641e09Santirez freeClient(slave);
90532f80e2fSantirez serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
906e2641e09Santirez continue;
907e2641e09Santirez }
908f48cd4b9Santirez if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
909e2641e09Santirez redis_fstat(slave->repldbfd,&buf) == -1) {
910e2641e09Santirez freeClient(slave);
91132f80e2fSantirez serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
912e2641e09Santirez continue;
913e2641e09Santirez }
914e2641e09Santirez slave->repldboff = 0;
915e2641e09Santirez slave->repldbsize = buf.st_size;
91632f80e2fSantirez slave->replstate = SLAVE_STATE_SEND_BULK;
91789ffba91Santirez slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
91889ffba91Santirez (unsigned long long) slave->repldbsize);
91989ffba91Santirez
920e2641e09Santirez aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
921e2641e09Santirez if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
922e2641e09Santirez freeClient(slave);
923e2641e09Santirez continue;
924e2641e09Santirez }
925e2641e09Santirez }
926e2641e09Santirez }
9273730d118Santirez }
928f18e5b63Santirez if (startbgsave) startBgsaveForReplication(mincapa);
929e2641e09Santirez }
930e2641e09Santirez
931f4aa600bSantirez /* ----------------------------------- SLAVE -------------------------------- */
932f4aa600bSantirez
933e3344b80Santirez /* Returns 1 if the given replication state is a handshake state,
934e3344b80Santirez * 0 otherwise. */
slaveIsInHandshakeState(void)935e3344b80Santirez int slaveIsInHandshakeState(void) {
936e3344b80Santirez return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
937e3344b80Santirez server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
938e3344b80Santirez }
939e3344b80Santirez
94011120689Santirez /* Avoid the master to detect the slave is timing out while loading the
94111120689Santirez * RDB file in initial synchronization. We send a single newline character
94211120689Santirez * that is valid protocol but is guaranteed to either be sent entierly or
94311120689Santirez * not, since the byte is indivisible.
94411120689Santirez *
94511120689Santirez * The function is called in two contexts: while we flush the current
94611120689Santirez * data with emptyDb(), and while we load the new data received as an
94711120689Santirez * RDB file from the master. */
replicationSendNewlineToMaster(void)94811120689Santirez void replicationSendNewlineToMaster(void) {
94911120689Santirez static time_t newline_sent;
95011120689Santirez if (time(NULL) != newline_sent) {
95111120689Santirez newline_sent = time(NULL);
95211120689Santirez if (write(server.repl_transfer_s,"\n",1) == -1) {
95311120689Santirez /* Pinging back in this stage is best-effort. */
95411120689Santirez }
95511120689Santirez }
95611120689Santirez }
95711120689Santirez
95811120689Santirez /* Callback used by emptyDb() while flushing away old data to load
95911120689Santirez * the new dataset received by the master. */
replicationEmptyDbCallback(void * privdata)96011120689Santirez void replicationEmptyDbCallback(void *privdata) {
96132f80e2fSantirez UNUSED(privdata);
96211120689Santirez replicationSendNewlineToMaster();
96311120689Santirez }
96411120689Santirez
965c5dd686eSantirez /* Once we have a link with the master and the synchroniziation was
966c5dd686eSantirez * performed, this function materializes the master client we store
967c5dd686eSantirez * at server.master, starting from the specified file descriptor. */
replicationCreateMasterClient(int fd)968c5dd686eSantirez void replicationCreateMasterClient(int fd) {
969c5dd686eSantirez server.master = createClient(fd);
97032f80e2fSantirez server.master->flags |= CLIENT_MASTER;
971c5dd686eSantirez server.master->authenticated = 1;
97232f80e2fSantirez server.repl_state = REPL_STATE_CONNECTED;
973c5dd686eSantirez server.master->reploff = server.repl_master_initial_offset;
974c5dd686eSantirez memcpy(server.master->replrunid, server.repl_master_runid,
975c5dd686eSantirez sizeof(server.repl_master_runid));
976c5dd686eSantirez /* If master offset is set to -1, this master is old and is not
977c5dd686eSantirez * PSYNC capable, so we flag it accordingly. */
978c5dd686eSantirez if (server.master->reploff == -1)
97932f80e2fSantirez server.master->flags |= CLIENT_PRE_PSYNC;
980c5dd686eSantirez }
981c5dd686eSantirez
982f4aa600bSantirez /* Asynchronously read the SYNC payload we receive from a master */
983784b9308Santirez #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
readSyncBulkPayload(aeEventLoop * el,int fd,void * privdata,int mask)984f4aa600bSantirez void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
98526b33669Santirez char buf[4096];
98662ec599cSantirez ssize_t nread, readlen;
987784b9308Santirez off_t left;
98832f80e2fSantirez UNUSED(el);
98932f80e2fSantirez UNUSED(privdata);
99032f80e2fSantirez UNUSED(mask);
991f4aa600bSantirez
9925ee2ccf4Santirez /* Static vars used to hold the EOF mark, and the last bytes received
9935ee2ccf4Santirez * form the server: when they match, we reached the end of the transfer. */
99432f80e2fSantirez static char eofmark[CONFIG_RUN_ID_SIZE];
99532f80e2fSantirez static char lastbytes[CONFIG_RUN_ID_SIZE];
9965ee2ccf4Santirez static int usemark = 0;
9975ee2ccf4Santirez
998784b9308Santirez /* If repl_transfer_size == -1 we still have to read the bulk length
99926b33669Santirez * from the master reply. */
1000784b9308Santirez if (server.repl_transfer_size == -1) {
10019157549fSantirez if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
100232f80e2fSantirez serverLog(LL_WARNING,
100326b33669Santirez "I/O error reading bulk count from MASTER: %s",
100426b33669Santirez strerror(errno));
1005b075621fSPieter Noordhuis goto error;
100626b33669Santirez }
1007b075621fSPieter Noordhuis
100826b33669Santirez if (buf[0] == '-') {
100932f80e2fSantirez serverLog(LL_WARNING,
101026b33669Santirez "MASTER aborted replication with an error: %s",
101126b33669Santirez buf+1);
1012b075621fSPieter Noordhuis goto error;
101389a1433eSantirez } else if (buf[0] == '\0') {
101489a1433eSantirez /* At this stage just a newline works as a PING in order to take
101589a1433eSantirez * the connection live. So we refresh our last interaction
101689a1433eSantirez * timestamp. */
1017d1949054SPremysl Hruby server.repl_transfer_lastio = server.unixtime;
101889a1433eSantirez return;
101926b33669Santirez } else if (buf[0] != '$') {
102032f80e2fSantirez serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1021b075621fSPieter Noordhuis goto error;
102226b33669Santirez }
10235ee2ccf4Santirez
10245ee2ccf4Santirez /* There are two possible forms for the bulk payload. One is the
10255ee2ccf4Santirez * usual $<count> bulk format. The other is used for diskless transfers
10265ee2ccf4Santirez * when the master does not know beforehand the size of the file to
10275ee2ccf4Santirez * transfer. In the latter case, the following format is used:
10285ee2ccf4Santirez *
10295ee2ccf4Santirez * $EOF:<40 bytes delimiter>
10305ee2ccf4Santirez *
10315ee2ccf4Santirez * At the end of the file the announced delimiter is transmitted. The
10325ee2ccf4Santirez * delimiter is long and random enough that the probability of a
10335ee2ccf4Santirez * collision with the actual file content can be ignored. */
103432f80e2fSantirez if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
10355ee2ccf4Santirez usemark = 1;
103632f80e2fSantirez memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
103732f80e2fSantirez memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
103880f7f63bSantirez /* Set any repl_transfer_size to avoid entering this code path
103980f7f63bSantirez * at the next call. */
104080f7f63bSantirez server.repl_transfer_size = 0;
104132f80e2fSantirez serverLog(LL_NOTICE,
10425ee2ccf4Santirez "MASTER <-> SLAVE sync: receiving streamed RDB from master");
10435ee2ccf4Santirez } else {
10445ee2ccf4Santirez usemark = 0;
1045784b9308Santirez server.repl_transfer_size = strtol(buf+1,NULL,10);
104632f80e2fSantirez serverLog(LL_NOTICE,
1047f9b5ca29Santirez "MASTER <-> SLAVE sync: receiving %lld bytes from master",
1048f9b5ca29Santirez (long long) server.repl_transfer_size);
10495ee2ccf4Santirez }
105026b33669Santirez return;
105126b33669Santirez }
105226b33669Santirez
105326b33669Santirez /* Read bulk data */
10545ee2ccf4Santirez if (usemark) {
10550c5a06f6Santirez readlen = sizeof(buf);
10560c5a06f6Santirez } else {
1057784b9308Santirez left = server.repl_transfer_size - server.repl_transfer_read;
1058784b9308Santirez readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
10595ee2ccf4Santirez }
10605ee2ccf4Santirez
1061f4aa600bSantirez nread = read(fd,buf,readlen);
1062f4aa600bSantirez if (nread <= 0) {
106332f80e2fSantirez serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1064f4aa600bSantirez (nread == -1) ? strerror(errno) : "connection lost");
1065c5f8c80aSantirez cancelReplicationHandshake();
1066f4aa600bSantirez return;
1067f4aa600bSantirez }
10681b732c09Santirez server.stat_net_input_bytes += nread;
10695ee2ccf4Santirez
10705ee2ccf4Santirez /* When a mark is used, we want to detect EOF asap in order to avoid
10715ee2ccf4Santirez * writing the EOF mark into the file... */
10725ee2ccf4Santirez int eof_reached = 0;
10735ee2ccf4Santirez
10745ee2ccf4Santirez if (usemark) {
10755ee2ccf4Santirez /* Update the last bytes array, and check if it matches our delimiter.*/
107632f80e2fSantirez if (nread >= CONFIG_RUN_ID_SIZE) {
107732f80e2fSantirez memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
10785ee2ccf4Santirez } else {
107932f80e2fSantirez int rem = CONFIG_RUN_ID_SIZE-nread;
10805ee2ccf4Santirez memmove(lastbytes,lastbytes+nread,rem);
10815ee2ccf4Santirez memcpy(lastbytes+rem,buf,nread);
10825ee2ccf4Santirez }
108332f80e2fSantirez if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
10845ee2ccf4Santirez }
10855ee2ccf4Santirez
1086d1949054SPremysl Hruby server.repl_transfer_lastio = server.unixtime;
1087f4aa600bSantirez if (write(server.repl_transfer_fd,buf,nread) != nread) {
108832f80e2fSantirez serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
1089b075621fSPieter Noordhuis goto error;
1090f4aa600bSantirez }
1091784b9308Santirez server.repl_transfer_read += nread;
1092784b9308Santirez
109325a3d996Santirez /* Delete the last 40 bytes from the file if we reached EOF. */
109425a3d996Santirez if (usemark && eof_reached) {
109525a3d996Santirez if (ftruncate(server.repl_transfer_fd,
109632f80e2fSantirez server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
109725a3d996Santirez {
109832f80e2fSantirez serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
109925a3d996Santirez goto error;
110025a3d996Santirez }
110125a3d996Santirez }
110225a3d996Santirez
1103784b9308Santirez /* Sync data on disk from time to time, otherwise at the end of the transfer
1104784b9308Santirez * we may suffer a big delay as the memory buffers are copied into the
1105784b9308Santirez * actual disk. */
1106784b9308Santirez if (server.repl_transfer_read >=
1107784b9308Santirez server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1108784b9308Santirez {
1109784b9308Santirez off_t sync_size = server.repl_transfer_read -
1110784b9308Santirez server.repl_transfer_last_fsync_off;
1111784b9308Santirez rdb_fsync_range(server.repl_transfer_fd,
1112784b9308Santirez server.repl_transfer_last_fsync_off, sync_size);
1113784b9308Santirez server.repl_transfer_last_fsync_off += sync_size;
1114784b9308Santirez }
1115784b9308Santirez
1116f4aa600bSantirez /* Check if the transfer is now complete */
11175ee2ccf4Santirez if (!usemark) {
11185ee2ccf4Santirez if (server.repl_transfer_read == server.repl_transfer_size)
11195ee2ccf4Santirez eof_reached = 1;
11205ee2ccf4Santirez }
11215ee2ccf4Santirez
11225ee2ccf4Santirez if (eof_reached) {
1123f48cd4b9Santirez if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
112432f80e2fSantirez serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1125c5f8c80aSantirez cancelReplicationHandshake();
1126f4aa600bSantirez return;
1127f4aa600bSantirez }
112832f80e2fSantirez serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1129a6c2f901Santirez signalFlushedDb(-1);
113011120689Santirez emptyDb(replicationEmptyDbCallback);
11319fd01051Santirez /* Before loading the DB into memory we need to delete the readable
11329fd01051Santirez * handler, otherwise it will get called recursively since
11339fd01051Santirez * rdbLoad() will call the event loop to process events from time to
11349fd01051Santirez * time for non blocking loading. */
11359fd01051Santirez aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
113632f80e2fSantirez serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
113740eb548aSantirez if (rdbLoad(server.rdb_filename) != C_OK) {
113832f80e2fSantirez serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1139c5f8c80aSantirez cancelReplicationHandshake();
1140f4aa600bSantirez return;
1141f4aa600bSantirez }
1142f4aa600bSantirez /* Final setup of the connected slave <- master link */
1143f4aa600bSantirez zfree(server.repl_transfer_tmpfile);
1144f4aa600bSantirez close(server.repl_transfer_fd);
1145c5dd686eSantirez replicationCreateMasterClient(server.repl_transfer_s);
114632f80e2fSantirez serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1147e7a2e7c1Santirez /* Restart the AOF subsystem now that we finished the sync. This
1148e7a2e7c1Santirez * will trigger an AOF rewrite, and when done will start appending
1149e7a2e7c1Santirez * to the new file. */
115032f80e2fSantirez if (server.aof_state != AOF_OFF) {
1151e7a2e7c1Santirez int retry = 10;
1152e7a2e7c1Santirez
1153e7a2e7c1Santirez stopAppendOnly();
115440eb548aSantirez while (retry-- && startAppendOnly() == C_ERR) {
115532f80e2fSantirez serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1156e7a2e7c1Santirez sleep(1);
1157e7a2e7c1Santirez }
1158e7a2e7c1Santirez if (!retry) {
115932f80e2fSantirez serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1160e7a2e7c1Santirez exit(1);
1161e7a2e7c1Santirez }
1162e7a2e7c1Santirez }
1163f4aa600bSantirez }
1164b075621fSPieter Noordhuis
1165b075621fSPieter Noordhuis return;
1166b075621fSPieter Noordhuis
1167b075621fSPieter Noordhuis error:
1168c5f8c80aSantirez cancelReplicationHandshake();
1169b075621fSPieter Noordhuis return;
1170f4aa600bSantirez }
1171f4aa600bSantirez
11723a328978Santirez /* Send a synchronous command to the master. Used to send AUTH and
117336def8fdSantirez * REPLCONF commands before starting the replication with SYNC.
11743a328978Santirez *
117507888202Santirez * The command returns an sds string representing the result of the
117607888202Santirez * operation. On error the first byte is a "-".
11773a328978Santirez */
117888c716a0Santirez #define SYNC_CMD_READ (1<<0)
117988c716a0Santirez #define SYNC_CMD_WRITE (1<<1)
118088c716a0Santirez #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
sendSynchronousCommand(int flags,int fd,...)118188c716a0Santirez char *sendSynchronousCommand(int flags, int fd, ...) {
11823a328978Santirez
11833a328978Santirez /* Create the command to send to the master, we use simple inline
11843a328978Santirez * protocol for simplicity as currently we only send simple strings. */
118588c716a0Santirez if (flags & SYNC_CMD_WRITE) {
118688c716a0Santirez char *arg;
118788c716a0Santirez va_list ap;
118888c716a0Santirez sds cmd = sdsempty();
11893a328978Santirez va_start(ap,fd);
119088c716a0Santirez
11913a328978Santirez while(1) {
11923a328978Santirez arg = va_arg(ap, char*);
11933a328978Santirez if (arg == NULL) break;
11943a328978Santirez
11953a328978Santirez if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
11963a328978Santirez cmd = sdscat(cmd,arg);
11973a328978Santirez }
11983a328978Santirez cmd = sdscatlen(cmd,"\r\n",2);
11993a328978Santirez
12003a328978Santirez /* Transfer command to the server. */
120188c716a0Santirez if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
120288c716a0Santirez == -1)
120388c716a0Santirez {
12043a328978Santirez sdsfree(cmd);
120507888202Santirez return sdscatprintf(sdsempty(),"-Writing to master: %s",
12063a328978Santirez strerror(errno));
12073a328978Santirez }
12083a328978Santirez sdsfree(cmd);
120988c716a0Santirez va_end(ap);
121088c716a0Santirez }
12113a328978Santirez
12123a328978Santirez /* Read the reply from the server. */
121388c716a0Santirez if (flags & SYNC_CMD_READ) {
121488c716a0Santirez char buf[256];
121588c716a0Santirez
121688c716a0Santirez if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000)
121788c716a0Santirez == -1)
12183a328978Santirez {
121907888202Santirez return sdscatprintf(sdsempty(),"-Reading from master: %s",
12203a328978Santirez strerror(errno));
12213a328978Santirez }
1222e3344b80Santirez server.repl_transfer_lastio = server.unixtime;
122307888202Santirez return sdsnew(buf);
12243a328978Santirez }
122588c716a0Santirez return NULL;
122688c716a0Santirez }
12273a328978Santirez
122807888202Santirez /* Try a partial resynchronization with the master if we are about to reconnect.
122907888202Santirez * If there is no cached master structure, at least try to issue a
123007888202Santirez * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
123107888202Santirez * command in order to obtain the master run id and the master replication
123207888202Santirez * global offset.
123307888202Santirez *
123407888202Santirez * This function is designed to be called from syncWithMaster(), so the
123507888202Santirez * following assumptions are made:
123607888202Santirez *
123707888202Santirez * 1) We pass the function an already connected socket "fd".
123807888202Santirez * 2) This function does not close the file descriptor "fd". However in case
123907888202Santirez * of successful partial resynchronization, the function will reuse
124007888202Santirez * 'fd' as file descriptor of the server.master client structure.
124107888202Santirez *
124288c716a0Santirez * The function is split in two halves: if read_reply is 0, the function
124388c716a0Santirez * writes the PSYNC command on the socket, and a new function call is
124488c716a0Santirez * needed, with read_reply set to 1, in order to read the reply of the
124588c716a0Santirez * command. This is useful in order to support non blocking operations, so
124688c716a0Santirez * that we write, return into the event loop, and read when there are data.
124788c716a0Santirez *
124888c716a0Santirez * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
124988c716a0Santirez * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
125088c716a0Santirez * with read_reply set to 1. However even when read_reply is set to 1
125188c716a0Santirez * the function may return PSYNC_WAIT_REPLY again to signal there were
125288c716a0Santirez * insufficient data to read to complete its work. We should re-enter
125388c716a0Santirez * into the event loop and wait in such a case.
125488c716a0Santirez *
125507888202Santirez * The function returns:
125607888202Santirez *
125707888202Santirez * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
125807888202Santirez * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
125907888202Santirez * In this case the master run_id and global replication
126007888202Santirez * offset is saved.
126107888202Santirez * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
126207888202Santirez * the caller should fall back to SYNC.
126388c716a0Santirez * PSYNC_WRITE_ERR: There was an error writing the command to the socket.
126488c716a0Santirez * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1265bea12591Santirez *
1266bea12591Santirez * Notable side effects:
1267bea12591Santirez *
1268bea12591Santirez * 1) As a side effect of the function call the function removes the readable
1269bea12591Santirez * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1270bea12591Santirez * 2) server.repl_master_initial_offset is set to the right value according
1271bea12591Santirez * to the master reply. This will be used to populate the 'server.master'
1272bea12591Santirez * structure replication offset.
127307888202Santirez */
127407888202Santirez
127588c716a0Santirez #define PSYNC_WRITE_ERROR 0
127688c716a0Santirez #define PSYNC_WAIT_REPLY 1
127788c716a0Santirez #define PSYNC_CONTINUE 2
127888c716a0Santirez #define PSYNC_FULLRESYNC 3
127988c716a0Santirez #define PSYNC_NOT_SUPPORTED 4
slaveTryPartialResynchronization(int fd,int read_reply)128088c716a0Santirez int slaveTryPartialResynchronization(int fd, int read_reply) {
128107888202Santirez char *psync_runid;
128207888202Santirez char psync_offset[32];
128307888202Santirez sds reply;
128407888202Santirez
128588c716a0Santirez /* Writing half */
128688c716a0Santirez if (!read_reply) {
128707888202Santirez /* Initially set repl_master_initial_offset to -1 to mark the current
128807888202Santirez * master run_id and offset as not valid. Later if we'll be able to do
128907888202Santirez * a FULL resync using the PSYNC command we'll set the offset at the
129007888202Santirez * right value, so that this information will be propagated to the
129107888202Santirez * client structure representing the master into server.master. */
129207888202Santirez server.repl_master_initial_offset = -1;
129307888202Santirez
129407888202Santirez if (server.cached_master) {
129507888202Santirez psync_runid = server.cached_master->replrunid;
129607888202Santirez snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
129732f80e2fSantirez serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
129807888202Santirez } else {
129932f80e2fSantirez serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
130007888202Santirez psync_runid = "?";
130107888202Santirez memcpy(psync_offset,"-1",3);
130207888202Santirez }
130307888202Santirez
130407888202Santirez /* Issue the PSYNC command */
130588c716a0Santirez reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
130688c716a0Santirez if (reply != NULL) {
130788c716a0Santirez serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
130888c716a0Santirez sdsfree(reply);
1309bea12591Santirez aeDeleteFileEvent(server.el,fd,AE_READABLE);
131088c716a0Santirez return PSYNC_WRITE_ERROR;
131188c716a0Santirez }
131288c716a0Santirez return PSYNC_WAIT_REPLY;
131388c716a0Santirez }
131488c716a0Santirez
131588c716a0Santirez /* Reading half */
131688c716a0Santirez reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
131788c716a0Santirez if (sdslen(reply) == 0) {
131888c716a0Santirez /* The master may send empty newlines after it receives PSYNC
131988c716a0Santirez * and before to reply, just to keep the connection alive. */
132088c716a0Santirez sdsfree(reply);
132188c716a0Santirez return PSYNC_WAIT_REPLY;
132288c716a0Santirez }
132307888202Santirez
1324bea12591Santirez aeDeleteFileEvent(server.el,fd,AE_READABLE);
1325bea12591Santirez
132607888202Santirez if (!strncmp(reply,"+FULLRESYNC",11)) {
132789b48f08Santirez char *runid = NULL, *offset = NULL;
132807888202Santirez
132907888202Santirez /* FULL RESYNC, parse the reply in order to extract the run id
133007888202Santirez * and the replication offset. */
133107888202Santirez runid = strchr(reply,' ');
133207888202Santirez if (runid) {
133307888202Santirez runid++;
133407888202Santirez offset = strchr(runid,' ');
133507888202Santirez if (offset) offset++;
133607888202Santirez }
133732f80e2fSantirez if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
133832f80e2fSantirez serverLog(LL_WARNING,
133907888202Santirez "Master replied with wrong +FULLRESYNC syntax.");
13400e1be534Santirez /* This is an unexpected condition, actually the +FULLRESYNC
13410e1be534Santirez * reply means that the master supports PSYNC, but the reply
13420e1be534Santirez * format seems wrong. To stay safe we blank the master
1343072c91feSantirez * runid to make sure next PSYNCs will fail. */
134432f80e2fSantirez memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
134507888202Santirez } else {
134607888202Santirez memcpy(server.repl_master_runid, runid, offset-runid-1);
134732f80e2fSantirez server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
134807888202Santirez server.repl_master_initial_offset = strtoll(offset,NULL,10);
134932f80e2fSantirez serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
135007888202Santirez server.repl_master_runid,
135107888202Santirez server.repl_master_initial_offset);
135207888202Santirez }
135307888202Santirez /* We are going to full resync, discard the cached master structure. */
135407888202Santirez replicationDiscardCachedMaster();
135507888202Santirez sdsfree(reply);
135607888202Santirez return PSYNC_FULLRESYNC;
135707888202Santirez }
135807888202Santirez
135907888202Santirez if (!strncmp(reply,"+CONTINUE",9)) {
136007888202Santirez /* Partial resync was accepted, set the replication state accordingly */
136132f80e2fSantirez serverLog(LL_NOTICE,
136207888202Santirez "Successful partial resynchronization with master.");
136307888202Santirez sdsfree(reply);
136407888202Santirez replicationResurrectCachedMaster(fd);
136507888202Santirez return PSYNC_CONTINUE;
136607888202Santirez }
136707888202Santirez
136888c716a0Santirez /* If we reach this point we received either an error since the master does
136907888202Santirez * not understand PSYNC, or an unexpected reply from the master.
13703f92e056Santirez * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
137107888202Santirez
137207888202Santirez if (strncmp(reply,"-ERR",4)) {
137307888202Santirez /* If it's not an error, log the unexpected event. */
137432f80e2fSantirez serverLog(LL_WARNING,
137507888202Santirez "Unexpected reply to PSYNC from master: %s", reply);
137607888202Santirez } else {
137732f80e2fSantirez serverLog(LL_NOTICE,
137807888202Santirez "Master does not support PSYNC or is in "
137907888202Santirez "error state (reply: %s)", reply);
138007888202Santirez }
138107888202Santirez sdsfree(reply);
138207888202Santirez replicationDiscardCachedMaster();
138307888202Santirez return PSYNC_NOT_SUPPORTED;
13843a328978Santirez }
13853a328978Santirez
syncWithMaster(aeEventLoop * el,int fd,void * privdata,int mask)1386a3309139SPieter Noordhuis void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
138788c716a0Santirez char tmpfile[256], *err = NULL;
1388e2641e09Santirez int dfd, maxtries = 5;
138907888202Santirez int sockerr = 0, psync_result;
1390bb66fc31Santirez socklen_t errlen = sizeof(sockerr);
139132f80e2fSantirez UNUSED(el);
139232f80e2fSantirez UNUSED(privdata);
139332f80e2fSantirez UNUSED(mask);
1394e2641e09Santirez
139576e772f3Santirez /* If this event fired after the user turned the instance into a master
139676e772f3Santirez * with SLAVEOF NO ONE we must just return ASAP. */
139732f80e2fSantirez if (server.repl_state == REPL_STATE_NONE) {
139876e772f3Santirez close(fd);
139976e772f3Santirez return;
140076e772f3Santirez }
140176e772f3Santirez
1402bb66fc31Santirez /* Check for errors in the socket. */
1403bb66fc31Santirez if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1404bb66fc31Santirez sockerr = errno;
1405bb66fc31Santirez if (sockerr) {
140632f80e2fSantirez serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
1407bb66fc31Santirez strerror(sockerr));
1408bb66fc31Santirez goto error;
1409bb66fc31Santirez }
1410bb66fc31Santirez
141188c716a0Santirez /* Send a PING to check the master is able to reply without errors. */
141232f80e2fSantirez if (server.repl_state == REPL_STATE_CONNECTING) {
141332f80e2fSantirez serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
1414bb66fc31Santirez /* Delete the writable event so that the readable event remains
1415bb66fc31Santirez * registered and we can wait for the PONG reply. */
1416bb66fc31Santirez aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
141732f80e2fSantirez server.repl_state = REPL_STATE_RECEIVE_PONG;
1418bb66fc31Santirez /* Send the PING, don't check for errors at all, we have the timeout
1419bb66fc31Santirez * that will take care about this. */
142088c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
142188c716a0Santirez if (err) goto write_error;
1422bb66fc31Santirez return;
1423bb66fc31Santirez }
1424bb66fc31Santirez
1425bb66fc31Santirez /* Receive the PONG command. */
142632f80e2fSantirez if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
142788c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1428bb66fc31Santirez
14293419c8ceSantirez /* We accept only two replies as valid, a positive +PONG reply
14303419c8ceSantirez * (we just check for "+") or an authentication error.
14313419c8ceSantirez * Note that older versions of Redis replied with "operation not
14323419c8ceSantirez * permitted" instead of using a proper error code, so we test
14333419c8ceSantirez * both. */
143488c716a0Santirez if (err[0] != '+' &&
143588c716a0Santirez strncmp(err,"-NOAUTH",7) != 0 &&
143688c716a0Santirez strncmp(err,"-ERR operation not permitted",28) != 0)
14373419c8ceSantirez {
143888c716a0Santirez serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
143988c716a0Santirez sdsfree(err);
1440bb66fc31Santirez goto error;
1441bb66fc31Santirez } else {
144232f80e2fSantirez serverLog(LL_NOTICE,
1443bb66fc31Santirez "Master replied to PING, replication can continue...");
1444bb66fc31Santirez }
144588c716a0Santirez sdsfree(err);
144688c716a0Santirez server.repl_state = REPL_STATE_SEND_AUTH;
1447bb66fc31Santirez }
1448e2641e09Santirez
1449e2641e09Santirez /* AUTH with the master if required. */
145088c716a0Santirez if (server.repl_state == REPL_STATE_SEND_AUTH) {
1451e2641e09Santirez if (server.masterauth) {
145288c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
145388c716a0Santirez if (err) goto write_error;
145488c716a0Santirez server.repl_state = REPL_STATE_RECEIVE_AUTH;
145588c716a0Santirez return;
145688c716a0Santirez } else {
145788c716a0Santirez server.repl_state = REPL_STATE_SEND_PORT;
145888c716a0Santirez }
145988c716a0Santirez }
146088c716a0Santirez
146188c716a0Santirez /* Receive AUTH reply. */
146288c716a0Santirez if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
146388c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
146407888202Santirez if (err[0] == '-') {
146532f80e2fSantirez serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
14663a328978Santirez sdsfree(err);
14673a328978Santirez goto error;
14683a328978Santirez }
146907888202Santirez sdsfree(err);
147088c716a0Santirez server.repl_state = REPL_STATE_SEND_PORT;
14713a328978Santirez }
1472a3309139SPieter Noordhuis
14733a328978Santirez /* Set the slave port, so that Master's INFO command can list the
14743a328978Santirez * slave listening port correctly. */
147588c716a0Santirez if (server.repl_state == REPL_STATE_SEND_PORT) {
14760a45fbc3Santirez sds port = sdsfromlonglong(server.slave_announce_port ?
14770a45fbc3Santirez server.slave_announce_port : server.port);
147888c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
147988c716a0Santirez "listening-port",port, NULL);
14803a328978Santirez sdsfree(port);
148188c716a0Santirez if (err) goto write_error;
148288c716a0Santirez sdsfree(err);
148388c716a0Santirez server.repl_state = REPL_STATE_RECEIVE_PORT;
148488c716a0Santirez return;
148588c716a0Santirez }
148688c716a0Santirez
148788c716a0Santirez /* Receive REPLCONF listening-port reply. */
148888c716a0Santirez if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
148988c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
14903a328978Santirez /* Ignore the error if any, not all the Redis versions support
14913a328978Santirez * REPLCONF listening-port. */
149207888202Santirez if (err[0] == '-') {
14933e6d4d59Santirez serverLog(LL_NOTICE,"(Non critical) Master does not understand "
14943e6d4d59Santirez "REPLCONF listening-port: %s", err);
14953e6d4d59Santirez }
14963e6d4d59Santirez sdsfree(err);
14970a45fbc3Santirez server.repl_state = REPL_STATE_SEND_IP;
14980a45fbc3Santirez }
14990a45fbc3Santirez
15000a45fbc3Santirez /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
15010a45fbc3Santirez if (server.repl_state == REPL_STATE_SEND_IP &&
15020a45fbc3Santirez server.slave_announce_ip == NULL)
15030a45fbc3Santirez {
15040a45fbc3Santirez server.repl_state = REPL_STATE_SEND_CAPA;
15050a45fbc3Santirez }
15060a45fbc3Santirez
15070a45fbc3Santirez /* Set the slave ip, so that Master's INFO command can list the
15080a45fbc3Santirez * slave IP address port correctly in case of port forwarding or NAT. */
15090a45fbc3Santirez if (server.repl_state == REPL_STATE_SEND_IP) {
15100a45fbc3Santirez err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
15110a45fbc3Santirez "ip-address",server.slave_announce_ip, NULL);
15120a45fbc3Santirez if (err) goto write_error;
15130a45fbc3Santirez sdsfree(err);
15140a45fbc3Santirez server.repl_state = REPL_STATE_RECEIVE_IP;
15150a45fbc3Santirez return;
15160a45fbc3Santirez }
15170a45fbc3Santirez
15180a45fbc3Santirez /* Receive REPLCONF ip-address reply. */
15190a45fbc3Santirez if (server.repl_state == REPL_STATE_RECEIVE_IP) {
15200a45fbc3Santirez err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
15210a45fbc3Santirez /* Ignore the error if any, not all the Redis versions support
15220a45fbc3Santirez * REPLCONF listening-port. */
15230a45fbc3Santirez if (err[0] == '-') {
15240a45fbc3Santirez serverLog(LL_NOTICE,"(Non critical) Master does not understand "
15250a45fbc3Santirez "REPLCONF ip-address: %s", err);
15260a45fbc3Santirez }
15270a45fbc3Santirez sdsfree(err);
152888c716a0Santirez server.repl_state = REPL_STATE_SEND_CAPA;
15293e6d4d59Santirez }
15303e6d4d59Santirez
15313e6d4d59Santirez /* Inform the master of our capabilities. While we currently send
15323e6d4d59Santirez * just one capability, it is possible to chain new capabilities here
15333e6d4d59Santirez * in the form of REPLCONF capa X capa Y capa Z ...
15343e6d4d59Santirez * The master will ignore capabilities it does not understand. */
153588c716a0Santirez if (server.repl_state == REPL_STATE_SEND_CAPA) {
153688c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
153788c716a0Santirez "capa","eof",NULL);
153888c716a0Santirez if (err) goto write_error;
153988c716a0Santirez sdsfree(err);
154088c716a0Santirez server.repl_state = REPL_STATE_RECEIVE_CAPA;
154188c716a0Santirez return;
154288c716a0Santirez }
15433e6d4d59Santirez
154488c716a0Santirez /* Receive CAPA reply. */
154588c716a0Santirez if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
154688c716a0Santirez err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
15473e6d4d59Santirez /* Ignore the error if any, not all the Redis versions support
15483e6d4d59Santirez * REPLCONF capa. */
15493e6d4d59Santirez if (err[0] == '-') {
15503e6d4d59Santirez serverLog(LL_NOTICE,"(Non critical) Master does not understand "
15513e6d4d59Santirez "REPLCONF capa: %s", err);
155207888202Santirez }
15533a328978Santirez sdsfree(err);
155488c716a0Santirez server.repl_state = REPL_STATE_SEND_PSYNC;
1555e2641e09Santirez }
155607888202Santirez
155707888202Santirez /* Try a partial resynchonization. If we don't have a cached master
155807888202Santirez * slaveTryPartialResynchronization() will at least try to use PSYNC
155907888202Santirez * to start a full resynchronization so that we get the master run id
156007888202Santirez * and the global offset, to try a partial resync at the next
156107888202Santirez * reconnection attempt. */
156288c716a0Santirez if (server.repl_state == REPL_STATE_SEND_PSYNC) {
156388c716a0Santirez if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
156488c716a0Santirez err = sdsnew("Write error sending the PSYNC command.");
156588c716a0Santirez goto write_error;
156688c716a0Santirez }
156788c716a0Santirez server.repl_state = REPL_STATE_RECEIVE_PSYNC;
156888c716a0Santirez return;
156988c716a0Santirez }
157088c716a0Santirez
157188c716a0Santirez /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
157288c716a0Santirez if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
157388c716a0Santirez serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
157488c716a0Santirez "state should be RECEIVE_PSYNC but is %d",
157588c716a0Santirez server.repl_state);
157688c716a0Santirez goto error;
157788c716a0Santirez }
157888c716a0Santirez
157988c716a0Santirez psync_result = slaveTryPartialResynchronization(fd,1);
1580bea12591Santirez if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
1581bea12591Santirez
1582bea12591Santirez /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
1583bea12591Santirez * uninstalling the read handler from the file descriptor. */
158488c716a0Santirez
158507888202Santirez if (psync_result == PSYNC_CONTINUE) {
158632f80e2fSantirez serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
158707888202Santirez return;
1588e2641e09Santirez }
1589e2641e09Santirez
1590c1e94b6bSantirez /* PSYNC failed or is not supported: we want our slaves to resync with us
1591c1e94b6bSantirez * as well, if we have any (chained replication case). The mater may
1592c1e94b6bSantirez * transfer us an entirely different data set and we have no way to
1593c1e94b6bSantirez * incrementally feed our slaves after that. */
1594c1e94b6bSantirez disconnectSlaves(); /* Force our slaves to resync with us as well. */
1595c1e94b6bSantirez freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1596c1e94b6bSantirez
159707888202Santirez /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
159807888202Santirez * and the server.repl_master_runid and repl_master_initial_offset are
159907888202Santirez * already populated. */
160007888202Santirez if (psync_result == PSYNC_NOT_SUPPORTED) {
160132f80e2fSantirez serverLog(LL_NOTICE,"Retrying with SYNC...");
1602299290d3Santirez if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
160332f80e2fSantirez serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
1604e2641e09Santirez strerror(errno));
1605a3309139SPieter Noordhuis goto error;
1606e2641e09Santirez }
160707888202Santirez }
160826b33669Santirez
160926b33669Santirez /* Prepare a suitable temp file for bulk transfer */
1610e2641e09Santirez while(maxtries--) {
1611e2641e09Santirez snprintf(tmpfile,256,
1612d1949054SPremysl Hruby "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1613e2641e09Santirez dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1614e2641e09Santirez if (dfd != -1) break;
1615e2641e09Santirez sleep(1);
1616e2641e09Santirez }
1617e2641e09Santirez if (dfd == -1) {
161832f80e2fSantirez serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1619a3309139SPieter Noordhuis goto error;
1620e2641e09Santirez }
1621e2641e09Santirez
1622f4aa600bSantirez /* Setup the non blocking download of the bulk file. */
162362ec599cSantirez if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
162462ec599cSantirez == AE_ERR)
1625f4aa600bSantirez {
162632f80e2fSantirez serverLog(LL_WARNING,
1627d7740fc8Santirez "Can't create readable event for SYNC: %s (fd=%d)",
1628d7740fc8Santirez strerror(errno),fd);
1629a3309139SPieter Noordhuis goto error;
1630a3309139SPieter Noordhuis }
1631a3309139SPieter Noordhuis
163232f80e2fSantirez server.repl_state = REPL_STATE_TRANSFER;
1633784b9308Santirez server.repl_transfer_size = -1;
1634784b9308Santirez server.repl_transfer_read = 0;
1635784b9308Santirez server.repl_transfer_last_fsync_off = 0;
1636a3309139SPieter Noordhuis server.repl_transfer_fd = dfd;
1637d1949054SPremysl Hruby server.repl_transfer_lastio = server.unixtime;
1638a3309139SPieter Noordhuis server.repl_transfer_tmpfile = zstrdup(tmpfile);
1639a3309139SPieter Noordhuis return;
1640a3309139SPieter Noordhuis
1641a3309139SPieter Noordhuis error:
1642bea12591Santirez aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1643a3309139SPieter Noordhuis close(fd);
1644bb66fc31Santirez server.repl_transfer_s = -1;
164532f80e2fSantirez server.repl_state = REPL_STATE_CONNECT;
1646a3309139SPieter Noordhuis return;
164788c716a0Santirez
164888c716a0Santirez write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
164988c716a0Santirez serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
165088c716a0Santirez sdsfree(err);
165188c716a0Santirez goto error;
1652a3309139SPieter Noordhuis }
1653a3309139SPieter Noordhuis
connectWithMaster(void)1654a3309139SPieter Noordhuis int connectWithMaster(void) {
1655a3309139SPieter Noordhuis int fd;
1656a3309139SPieter Noordhuis
16578366907bSantirez fd = anetTcpNonBlockBestEffortBindConnect(NULL,
165832f80e2fSantirez server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
1659a3309139SPieter Noordhuis if (fd == -1) {
166032f80e2fSantirez serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
1661a3309139SPieter Noordhuis strerror(errno));
166240eb548aSantirez return C_ERR;
1663a3309139SPieter Noordhuis }
1664a3309139SPieter Noordhuis
166545029d37Santirez if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1666b075621fSPieter Noordhuis AE_ERR)
1667a3309139SPieter Noordhuis {
1668e2641e09Santirez close(fd);
166932f80e2fSantirez serverLog(LL_WARNING,"Can't create readable event for SYNC");
167040eb548aSantirez return C_ERR;
1671e2641e09Santirez }
1672a3309139SPieter Noordhuis
1673d1949054SPremysl Hruby server.repl_transfer_lastio = server.unixtime;
1674f4aa600bSantirez server.repl_transfer_s = fd;
167532f80e2fSantirez server.repl_state = REPL_STATE_CONNECTING;
167640eb548aSantirez return C_OK;
1677e2641e09Santirez }
1678e2641e09Santirez
167927acd7aaSantirez /* This function can be called when a non blocking connection is currently
1680c5f8c80aSantirez * in progress to undo it.
1681c5f8c80aSantirez * Never call this function directly, use cancelReplicationHandshake() instead.
1682c5f8c80aSantirez */
undoConnectWithMaster(void)168327acd7aaSantirez void undoConnectWithMaster(void) {
168427acd7aaSantirez int fd = server.repl_transfer_s;
168527acd7aaSantirez
168627acd7aaSantirez aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
168727acd7aaSantirez close(fd);
168827acd7aaSantirez server.repl_transfer_s = -1;
1689c5f8c80aSantirez }
1690c5f8c80aSantirez
1691c5f8c80aSantirez /* Abort the async download of the bulk dataset while SYNC-ing with master.
1692c5f8c80aSantirez * Never call this function directly, use cancelReplicationHandshake() instead.
1693c5f8c80aSantirez */
replicationAbortSyncTransfer(void)1694c5f8c80aSantirez void replicationAbortSyncTransfer(void) {
1695c5f8c80aSantirez serverAssert(server.repl_state == REPL_STATE_TRANSFER);
1696c5f8c80aSantirez undoConnectWithMaster();
1697c5f8c80aSantirez close(server.repl_transfer_fd);
1698c5f8c80aSantirez unlink(server.repl_transfer_tmpfile);
1699c5f8c80aSantirez zfree(server.repl_transfer_tmpfile);
170027acd7aaSantirez }
170127acd7aaSantirez
1702ef99e146Santirez /* This function aborts a non blocking replication attempt if there is one
1703ef99e146Santirez * in progress, by canceling the non-blocking connect attempt or
1704ef99e146Santirez * the initial bulk transfer.
1705ef99e146Santirez *
1706ef99e146Santirez * If there was a replication handshake in progress 1 is returned and
170732f80e2fSantirez * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
1708ef99e146Santirez *
1709ef99e146Santirez * Otherwise zero is returned and no operation is perforemd at all. */
cancelReplicationHandshake(void)1710ef99e146Santirez int cancelReplicationHandshake(void) {
171132f80e2fSantirez if (server.repl_state == REPL_STATE_TRANSFER) {
1712ef99e146Santirez replicationAbortSyncTransfer();
1713c5f8c80aSantirez server.repl_state = REPL_STATE_CONNECT;
171432f80e2fSantirez } else if (server.repl_state == REPL_STATE_CONNECTING ||
1715e3344b80Santirez slaveIsInHandshakeState())
1716ef99e146Santirez {
1717ef99e146Santirez undoConnectWithMaster();
1718c5f8c80aSantirez server.repl_state = REPL_STATE_CONNECT;
1719ef99e146Santirez } else {
1720ef99e146Santirez return 0;
1721ef99e146Santirez }
1722ef99e146Santirez return 1;
1723ef99e146Santirez }
1724ef99e146Santirez
17257bead003Santirez /* Set replication to the specified master address and port. */
replicationSetMaster(char * ip,int port)17267bead003Santirez void replicationSetMaster(char *ip, int port) {
17277bead003Santirez sdsfree(server.masterhost);
17283be89312Santirez server.masterhost = sdsnew(ip);
17297bead003Santirez server.masterport = port;
17307bead003Santirez if (server.master) freeClient(server.master);
1731c3ad7090Santirez disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
17327bead003Santirez disconnectSlaves(); /* Force our slaves to resync with us as well. */
17337bead003Santirez replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
17347bead003Santirez freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
17357bead003Santirez cancelReplicationHandshake();
173632f80e2fSantirez server.repl_state = REPL_STATE_CONNECT;
173794e8c9e7Santirez server.master_repl_offset = 0;
1738abd6308dSantirez server.repl_down_since = 0;
17397bead003Santirez }
17407bead003Santirez
17417bead003Santirez /* Cancel replication, setting the instance as a master itself. */
replicationUnsetMaster(void)17427bead003Santirez void replicationUnsetMaster(void) {
17437bead003Santirez if (server.masterhost == NULL) return; /* Nothing to do. */
1744e2641e09Santirez sdsfree(server.masterhost);
1745e2641e09Santirez server.masterhost = NULL;
174694e8c9e7Santirez if (server.master) {
174794e8c9e7Santirez if (listLength(server.slaves) == 0) {
174894e8c9e7Santirez /* If this instance is turned into a master and there are no
174994e8c9e7Santirez * slaves, it inherits the replication offset from the master.
175094e8c9e7Santirez * Under certain conditions this makes replicas comparable by
175194e8c9e7Santirez * replication offset to understand what is the most updated. */
175294e8c9e7Santirez server.master_repl_offset = server.master->reploff;
175394e8c9e7Santirez freeReplicationBacklog();
175494e8c9e7Santirez }
175594e8c9e7Santirez freeClient(server.master);
175694e8c9e7Santirez }
175707888202Santirez replicationDiscardCachedMaster();
1758ef99e146Santirez cancelReplicationHandshake();
175932f80e2fSantirez server.repl_state = REPL_STATE_NONE;
17607bead003Santirez }
17617bead003Santirez
1762278ea9d1Santirez /* This function is called when the slave lose the connection with the
1763278ea9d1Santirez * master into an unexpected way. */
replicationHandleMasterDisconnection(void)1764278ea9d1Santirez void replicationHandleMasterDisconnection(void) {
1765278ea9d1Santirez server.master = NULL;
1766278ea9d1Santirez server.repl_state = REPL_STATE_CONNECT;
1767278ea9d1Santirez server.repl_down_since = server.unixtime;
1768c1e94b6bSantirez /* We lost connection with our master, don't disconnect slaves yet,
1769c1e94b6bSantirez * maybe we'll be able to PSYNC with our master later. We'll disconnect
1770c1e94b6bSantirez * the slaves only if we'll have to do a full resync with our master. */
1771278ea9d1Santirez }
1772278ea9d1Santirez
slaveofCommand(client * c)1773554bd0e7Santirez void slaveofCommand(client *c) {
1774b7d085fcSantirez /* SLAVEOF is not allowed in cluster mode as replication is automatically
1775b7d085fcSantirez * configured using the current address of the master node. */
1776b7d085fcSantirez if (server.cluster_enabled) {
1777b7d085fcSantirez addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1778b7d085fcSantirez return;
1779b7d085fcSantirez }
1780b7d085fcSantirez
1781b7d085fcSantirez /* The special host/port combination "NO" "ONE" turns the instance
1782b7d085fcSantirez * into a master. Otherwise the new master address is set. */
17837bead003Santirez if (!strcasecmp(c->argv[1]->ptr,"no") &&
17847bead003Santirez !strcasecmp(c->argv[2]->ptr,"one")) {
17857bead003Santirez if (server.masterhost) {
17867bead003Santirez replicationUnsetMaster();
1787d036abe2Santirez sds client = catClientInfoString(sdsempty(),c);
1788d036abe2Santirez serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
1789d036abe2Santirez client);
1790d036abe2Santirez sdsfree(client);
1791e2641e09Santirez }
1792e2641e09Santirez } else {
1793ebdfad69Santirez long port;
1794ebdfad69Santirez
179540eb548aSantirez if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
1796ebdfad69Santirez return;
1797ebdfad69Santirez
1798ebdfad69Santirez /* Check if we are already attached to the specified slave */
1799ebdfad69Santirez if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1800ebdfad69Santirez && server.masterport == port) {
180132f80e2fSantirez serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1802ebdfad69Santirez addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1803ebdfad69Santirez return;
1804ebdfad69Santirez }
1805ebdfad69Santirez /* There was no previous master or the user specified a different one,
1806ebdfad69Santirez * we can continue. */
18077bead003Santirez replicationSetMaster(c->argv[1]->ptr, port);
1808d036abe2Santirez sds client = catClientInfoString(sdsempty(),c);
1809d036abe2Santirez serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
1810d036abe2Santirez server.masterhost, server.masterport, client);
1811d036abe2Santirez sdsfree(client);
1812e2641e09Santirez }
1813e2641e09Santirez addReply(c,shared.ok);
1814e2641e09Santirez }
1815f4aa600bSantirez
1816d34c2fa3Santirez /* ROLE command: provide information about the role of the instance
1817d34c2fa3Santirez * (master or slave) and additional information related to replication
1818d34c2fa3Santirez * in an easy to process format. */
roleCommand(client * c)1819554bd0e7Santirez void roleCommand(client *c) {
1820d34c2fa3Santirez if (server.masterhost == NULL) {
1821d34c2fa3Santirez listIter li;
1822d34c2fa3Santirez listNode *ln;
1823d34c2fa3Santirez void *mbcount;
1824d34c2fa3Santirez int slaves = 0;
1825d34c2fa3Santirez
1826d34c2fa3Santirez addReplyMultiBulkLen(c,3);
1827d34c2fa3Santirez addReplyBulkCBuffer(c,"master",6);
1828d34c2fa3Santirez addReplyLongLong(c,server.master_repl_offset);
1829d34c2fa3Santirez mbcount = addDeferredMultiBulkLength(c);
1830d34c2fa3Santirez listRewind(server.slaves,&li);
1831d34c2fa3Santirez while((ln = listNext(&li))) {
1832554bd0e7Santirez client *slave = ln->value;
18330a45fbc3Santirez char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
1834d34c2fa3Santirez
18350a45fbc3Santirez if (slaveip[0] == '\0') {
18360a45fbc3Santirez if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1)
18370a45fbc3Santirez continue;
18380a45fbc3Santirez slaveip = ip;
18390a45fbc3Santirez }
184032f80e2fSantirez if (slave->replstate != SLAVE_STATE_ONLINE) continue;
1841d34c2fa3Santirez addReplyMultiBulkLen(c,3);
18420a45fbc3Santirez addReplyBulkCString(c,slaveip);
1843d34c2fa3Santirez addReplyBulkLongLong(c,slave->slave_listening_port);
1844d34c2fa3Santirez addReplyBulkLongLong(c,slave->repl_ack_off);
1845d34c2fa3Santirez slaves++;
1846d34c2fa3Santirez }
1847d34c2fa3Santirez setDeferredMultiBulkLength(c,mbcount,slaves);
1848d34c2fa3Santirez } else {
18496a13193dSantirez char *slavestate = NULL;
18506a13193dSantirez
18517970d539Santirez addReplyMultiBulkLen(c,5);
1852d34c2fa3Santirez addReplyBulkCBuffer(c,"slave",5);
1853d34c2fa3Santirez addReplyBulkCString(c,server.masterhost);
1854d34c2fa3Santirez addReplyLongLong(c,server.masterport);
1855e3344b80Santirez if (slaveIsInHandshakeState()) {
1856e3344b80Santirez slavestate = "handshake";
1857e3344b80Santirez } else {
18586a13193dSantirez switch(server.repl_state) {
185932f80e2fSantirez case REPL_STATE_NONE: slavestate = "none"; break;
186032f80e2fSantirez case REPL_STATE_CONNECT: slavestate = "connect"; break;
186132f80e2fSantirez case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
186232f80e2fSantirez case REPL_STATE_TRANSFER: slavestate = "sync"; break;
186332f80e2fSantirez case REPL_STATE_CONNECTED: slavestate = "connected"; break;
18646a13193dSantirez default: slavestate = "unknown"; break;
18656a13193dSantirez }
1866e3344b80Santirez }
18676a13193dSantirez addReplyBulkCString(c,slavestate);
18686a13193dSantirez addReplyLongLong(c,server.master ? server.master->reploff : -1);
1869d34c2fa3Santirez }
1870d34c2fa3Santirez }
1871d34c2fa3Santirez
1872efd87031Santirez /* Send a REPLCONF ACK command to the master to inform it about the current
1873efd87031Santirez * processed offset. If we are not connected with a master, the command has
1874efd87031Santirez * no effects. */
replicationSendAck(void)1875efd87031Santirez void replicationSendAck(void) {
1876554bd0e7Santirez client *c = server.master;
1877efd87031Santirez
1878efd87031Santirez if (c != NULL) {
187932f80e2fSantirez c->flags |= CLIENT_MASTER_FORCE_REPLY;
1880efd87031Santirez addReplyMultiBulkLen(c,3);
1881efd87031Santirez addReplyBulkCString(c,"REPLCONF");
1882efd87031Santirez addReplyBulkCString(c,"ACK");
1883efd87031Santirez addReplyBulkLongLong(c,c->reploff);
188432f80e2fSantirez c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
1885efd87031Santirez }
1886efd87031Santirez }
1887efd87031Santirez
188807888202Santirez /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
188907888202Santirez
189007888202Santirez /* In order to implement partial synchronization we need to be able to cache
189107888202Santirez * our master's client structure after a transient disconnection.
189207888202Santirez * It is cached into server.cached_master and flushed away using the following
189307888202Santirez * functions. */
189407888202Santirez
189507888202Santirez /* This function is called by freeClient() in order to cache the master
189607888202Santirez * client structure instead of destryoing it. freeClient() will return
189707888202Santirez * ASAP after this function returns, so every action needed to avoid problems
189807888202Santirez * with a client that is really "suspended" has to be done by this function.
189907888202Santirez *
190007888202Santirez * The other functions that will deal with the cached master are:
190107888202Santirez *
190207888202Santirez * replicationDiscardCachedMaster() that will make sure to kill the client
190307888202Santirez * as for some reason we don't want to use it in the future.
190407888202Santirez *
190507888202Santirez * replicationResurrectCachedMaster() that is used after a successful PSYNC
190607888202Santirez * handshake in order to reactivate the cached master.
190707888202Santirez */
replicationCacheMaster(client * c)1908554bd0e7Santirez void replicationCacheMaster(client *c) {
19092d9e3eb1Santirez serverAssert(server.master != NULL && server.cached_master == NULL);
191032f80e2fSantirez serverLog(LL_NOTICE,"Caching the disconnected master state.");
191107888202Santirez
191247e6cf11Santirez /* Unlink the client from the server structures. */
191347e6cf11Santirez unlinkClient(c);
191423e7710cSantirez
191507888202Santirez /* Save the master. Server.master will be set to null later by
191607888202Santirez * replicationHandleMasterDisconnection(). */
191707888202Santirez server.cached_master = server.master;
191807888202Santirez
19190bcc7cb4Santirez /* Invalidate the Peer ID cache. */
19200bcc7cb4Santirez if (c->peerid) {
19210bcc7cb4Santirez sdsfree(c->peerid);
19220bcc7cb4Santirez c->peerid = NULL;
19230bcc7cb4Santirez }
19240bcc7cb4Santirez
192507888202Santirez /* Caching the master happens instead of the actual freeClient() call,
192607888202Santirez * so make sure to adjust the replication state. This function will
192707888202Santirez * also set server.master to NULL. */
192807888202Santirez replicationHandleMasterDisconnection();
192907888202Santirez }
193007888202Santirez
193107888202Santirez /* Free a cached master, called when there are no longer the conditions for
193207888202Santirez * a partial resync on reconnection. */
replicationDiscardCachedMaster(void)193307888202Santirez void replicationDiscardCachedMaster(void) {
193407888202Santirez if (server.cached_master == NULL) return;
193507888202Santirez
193632f80e2fSantirez serverLog(LL_NOTICE,"Discarding previously cached master state.");
193732f80e2fSantirez server.cached_master->flags &= ~CLIENT_MASTER;
193807888202Santirez freeClient(server.cached_master);
193907888202Santirez server.cached_master = NULL;
194007888202Santirez }
194107888202Santirez
194207888202Santirez /* Turn the cached master into the current master, using the file descriptor
194307888202Santirez * passed as argument as the socket for the new master.
194407888202Santirez *
19453a82b8acSAaron Rutkovsky * This function is called when successfully setup a partial resynchronization
194607888202Santirez * so the stream of data that we'll receive will start from were this
194707888202Santirez * master left. */
replicationResurrectCachedMaster(int newfd)194807888202Santirez void replicationResurrectCachedMaster(int newfd) {
194907888202Santirez server.master = server.cached_master;
195007888202Santirez server.cached_master = NULL;
195107888202Santirez server.master->fd = newfd;
195232f80e2fSantirez server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
195307888202Santirez server.master->authenticated = 1;
195407888202Santirez server.master->lastinteraction = server.unixtime;
195532f80e2fSantirez server.repl_state = REPL_STATE_CONNECTED;
195607888202Santirez
195707888202Santirez /* Re-add to the list of clients. */
195807888202Santirez listAddNodeTail(server.clients,server.master);
195907888202Santirez if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
196007888202Santirez readQueryFromClient, server.master)) {
196132f80e2fSantirez serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
196207888202Santirez freeClientAsync(server.master); /* Close ASAP. */
196307888202Santirez }
19641461422cSantirez
19651461422cSantirez /* We may also need to install the write handler as well if there is
19661461422cSantirez * pending data in the write buffers. */
19672d21af45Santirez if (clientHasPendingReplies(server.master)) {
19681461422cSantirez if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
19691461422cSantirez sendReplyToClient, server.master)) {
197032f80e2fSantirez serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
19711461422cSantirez freeClientAsync(server.master); /* Close ASAP. */
19721461422cSantirez }
19731461422cSantirez }
197407888202Santirez }
197507888202Santirez
1976ed599d3aSantirez /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
1977ed599d3aSantirez
1978ed599d3aSantirez /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1979ed599d3aSantirez * If the option is active, the server will prevent writes if there are not
1980ed599d3aSantirez * enough connected slaves with the specified lag (or less). */
refreshGoodSlavesCount(void)1981ed599d3aSantirez void refreshGoodSlavesCount(void) {
1982ed599d3aSantirez listIter li;
1983ed599d3aSantirez listNode *ln;
1984ed599d3aSantirez int good = 0;
1985ed599d3aSantirez
1986ed599d3aSantirez if (!server.repl_min_slaves_to_write ||
1987ed599d3aSantirez !server.repl_min_slaves_max_lag) return;
1988ed599d3aSantirez
1989ed599d3aSantirez listRewind(server.slaves,&li);
1990ed599d3aSantirez while((ln = listNext(&li))) {
1991554bd0e7Santirez client *slave = ln->value;
1992ed599d3aSantirez time_t lag = server.unixtime - slave->repl_ack_time;
1993ed599d3aSantirez
199432f80e2fSantirez if (slave->replstate == SLAVE_STATE_ONLINE &&
1995ed599d3aSantirez lag <= server.repl_min_slaves_max_lag) good++;
1996ed599d3aSantirez }
1997ed599d3aSantirez server.repl_good_slaves_count = good;
1998ed599d3aSantirez }
1999ed599d3aSantirez
200094ec7db4Santirez /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
200194ec7db4Santirez * The goal of this code is to keep track of scripts already sent to every
200294ec7db4Santirez * connected slave, in order to be able to replicate EVALSHA as it is without
200394ec7db4Santirez * translating it to EVAL every time it is possible.
200494ec7db4Santirez *
200511e81a1eSantirez * We use a capped collection implemented by a hash table for fast lookup
200694ec7db4Santirez * of scripts we can send as EVALSHA, plus a linked list that is used for
200794ec7db4Santirez * eviction of the oldest entry when the max number of items is reached.
200894ec7db4Santirez *
200994ec7db4Santirez * We don't care about taking a different cache for every different slave
201094ec7db4Santirez * since to fill the cache again is not very costly, the goal of this code
201194ec7db4Santirez * is to avoid that the same big script is trasmitted a big number of times
201294ec7db4Santirez * per second wasting bandwidth and processor speed, but it is not a problem
201394ec7db4Santirez * if we need to rebuild the cache from scratch from time to time, every used
201494ec7db4Santirez * script will need to be transmitted a single time to reappear in the cache.
201594ec7db4Santirez *
201694ec7db4Santirez * This is how the system works:
201794ec7db4Santirez *
201894ec7db4Santirez * 1) Every time a new slave connects, we flush the whole script cache.
201994ec7db4Santirez * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
202094ec7db4Santirez * trying to convert EVAL into EVALSHA specifically for slaves.
202194ec7db4Santirez * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
202294ec7db4Santirez * corresponding SHA1 of the script into the cache as we are sure every
202394ec7db4Santirez * slave knows about the script starting from now.
202494ec7db4Santirez * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
202594ec7db4Santirez * and at the same time flush the script cache.
202694ec7db4Santirez * 5) When the last slave disconnects, flush the cache.
202794ec7db4Santirez * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
202894ec7db4Santirez * in the master sometimes.
202994ec7db4Santirez */
203094ec7db4Santirez
203194ec7db4Santirez /* Initialize the script cache, only called at startup. */
replicationScriptCacheInit(void)203294ec7db4Santirez void replicationScriptCacheInit(void) {
203394ec7db4Santirez server.repl_scriptcache_size = 10000;
203494ec7db4Santirez server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
203594ec7db4Santirez server.repl_scriptcache_fifo = listCreate();
203694ec7db4Santirez }
203794ec7db4Santirez
203894ec7db4Santirez /* Empty the script cache. Should be called every time we are no longer sure
2039f0bf5fd8Santirez * that every slave knows about all the scripts in our set, or when the
2040f0bf5fd8Santirez * current AOF "context" is no longer aware of the script. In general we
2041f0bf5fd8Santirez * should flush the cache:
2042f0bf5fd8Santirez *
2043f0bf5fd8Santirez * 1) Every time a new slave reconnects to this master and performs a
2044f0bf5fd8Santirez * full SYNC (PSYNC does not require flushing).
2045f0bf5fd8Santirez * 2) Every time an AOF rewrite is performed.
2046f0bf5fd8Santirez * 3) Every time we are left without slaves at all, and AOF is off, in order
2047f0bf5fd8Santirez * to reclaim otherwise unused memory.
2048f0bf5fd8Santirez */
replicationScriptCacheFlush(void)204994ec7db4Santirez void replicationScriptCacheFlush(void) {
20502eb781b3Santirez dictEmpty(server.repl_scriptcache_dict,NULL);
205194ec7db4Santirez listRelease(server.repl_scriptcache_fifo);
205294ec7db4Santirez server.repl_scriptcache_fifo = listCreate();
205394ec7db4Santirez }
205494ec7db4Santirez
205594ec7db4Santirez /* Add an entry into the script cache, if we reach max number of entries the
205694ec7db4Santirez * oldest is removed from the list. */
replicationScriptCacheAdd(sds sha1)205794ec7db4Santirez void replicationScriptCacheAdd(sds sha1) {
205894ec7db4Santirez int retval;
205994ec7db4Santirez sds key = sdsdup(sha1);
206094ec7db4Santirez
206194ec7db4Santirez /* Evict oldest. */
206294ec7db4Santirez if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
206394ec7db4Santirez {
206494ec7db4Santirez listNode *ln = listLast(server.repl_scriptcache_fifo);
206594ec7db4Santirez sds oldest = listNodeValue(ln);
206694ec7db4Santirez
206794ec7db4Santirez retval = dictDelete(server.repl_scriptcache_dict,oldest);
20682d9e3eb1Santirez serverAssert(retval == DICT_OK);
206994ec7db4Santirez listDelNode(server.repl_scriptcache_fifo,ln);
207094ec7db4Santirez }
207194ec7db4Santirez
207294ec7db4Santirez /* Add current. */
207394ec7db4Santirez retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
207494ec7db4Santirez listAddNodeHead(server.repl_scriptcache_fifo,key);
20752d9e3eb1Santirez serverAssert(retval == DICT_OK);
207694ec7db4Santirez }
207794ec7db4Santirez
207894ec7db4Santirez /* Returns non-zero if the specified entry exists inside the cache, that is,
207994ec7db4Santirez * if all the slaves are aware of this script SHA1. */
replicationScriptCacheExists(sds sha1)208094ec7db4Santirez int replicationScriptCacheExists(sds sha1) {
2081f0bf5fd8Santirez return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
208294ec7db4Santirez }
208394ec7db4Santirez
2084c5618e7fSantirez /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2085c5618e7fSantirez * Redis synchronous replication design can be summarized in points:
2086c5618e7fSantirez *
2087c5618e7fSantirez * - Redis masters have a global replication offset, used by PSYNC.
2088c5618e7fSantirez * - Master increment the offset every time new commands are sent to slaves.
2089c5618e7fSantirez * - Slaves ping back masters with the offset processed so far.
2090c5618e7fSantirez *
2091c5618e7fSantirez * So synchronous replication adds a new WAIT command in the form:
2092c5618e7fSantirez *
2093c5618e7fSantirez * WAIT <num_replicas> <milliseconds_timeout>
2094c5618e7fSantirez *
2095c5618e7fSantirez * That returns the number of replicas that processed the query when
2096c5618e7fSantirez * we finally have at least num_replicas, or when the timeout was
2097c5618e7fSantirez * reached.
2098c5618e7fSantirez *
2099c5618e7fSantirez * The command is implemented in this way:
2100c5618e7fSantirez *
2101c5618e7fSantirez * - Every time a client processes a command, we remember the replication
2102c5618e7fSantirez * offset after sending that command to the slaves.
2103c5618e7fSantirez * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2104c5618e7fSantirez * The client is blocked at the same time (see blocked.c).
2105c5618e7fSantirez * - Once we receive enough ACKs for a given offset or when the timeout
2106c5618e7fSantirez * is reached, the WAIT command is unblocked and the reply sent to the
2107c5618e7fSantirez * client.
2108c5618e7fSantirez */
2109c5618e7fSantirez
2110c5618e7fSantirez /* This just set a flag so that we broadcast a REPLCONF GETACK command
2111c5618e7fSantirez * to all the slaves in the beforeSleep() function. Note that this way
2112c5618e7fSantirez * we "group" all the clients that want to wait for synchronouns replication
2113c5618e7fSantirez * in a given event loop iteration, and send a single GETACK for them all. */
replicationRequestAckFromSlaves(void)2114c5618e7fSantirez void replicationRequestAckFromSlaves(void) {
2115c5618e7fSantirez server.get_ack_from_slaves = 1;
2116c5618e7fSantirez }
2117c5618e7fSantirez
2118c5618e7fSantirez /* Return the number of slaves that already acknowledged the specified
2119c5618e7fSantirez * replication offset. */
replicationCountAcksByOffset(long long offset)2120c5618e7fSantirez int replicationCountAcksByOffset(long long offset) {
2121c5618e7fSantirez listIter li;
2122c5618e7fSantirez listNode *ln;
2123c5618e7fSantirez int count = 0;
2124c5618e7fSantirez
2125c5618e7fSantirez listRewind(server.slaves,&li);
2126c5618e7fSantirez while((ln = listNext(&li))) {
2127554bd0e7Santirez client *slave = ln->value;
2128c5618e7fSantirez
212932f80e2fSantirez if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2130c5618e7fSantirez if (slave->repl_ack_off >= offset) count++;
2131c5618e7fSantirez }
2132c5618e7fSantirez return count;
2133c5618e7fSantirez }
2134c5618e7fSantirez
2135c5618e7fSantirez /* WAIT for N replicas to acknowledge the processing of our latest
2136c5618e7fSantirez * write command (and all the previous commands). */
waitCommand(client * c)2137554bd0e7Santirez void waitCommand(client *c) {
2138c5618e7fSantirez mstime_t timeout;
2139c5618e7fSantirez long numreplicas, ackreplicas;
2140c5618e7fSantirez long long offset = c->woff;
2141c5618e7fSantirez
2142c5618e7fSantirez /* Argument parsing. */
214340eb548aSantirez if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
2144c5618e7fSantirez return;
2145c5618e7fSantirez if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
214640eb548aSantirez != C_OK) return;
2147c5618e7fSantirez
2148c5618e7fSantirez /* First try without blocking at all. */
2149c5618e7fSantirez ackreplicas = replicationCountAcksByOffset(c->woff);
215032f80e2fSantirez if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
2151c5618e7fSantirez addReplyLongLong(c,ackreplicas);
2152c5618e7fSantirez return;
2153c5618e7fSantirez }
2154c5618e7fSantirez
2155c5618e7fSantirez /* Otherwise block the client and put it into our list of clients
2156c5618e7fSantirez * waiting for ack from slaves. */
2157c5618e7fSantirez c->bpop.timeout = timeout;
2158c5618e7fSantirez c->bpop.reploffset = offset;
2159c5618e7fSantirez c->bpop.numreplicas = numreplicas;
2160c5618e7fSantirez listAddNodeTail(server.clients_waiting_acks,c);
216132f80e2fSantirez blockClient(c,BLOCKED_WAIT);
2162c5618e7fSantirez
2163c5618e7fSantirez /* Make sure that the server will send an ACK request to all the slaves
2164c5618e7fSantirez * before returning to the event loop. */
2165c5618e7fSantirez replicationRequestAckFromSlaves();
2166c5618e7fSantirez }
2167c5618e7fSantirez
2168c5618e7fSantirez /* This is called by unblockClient() to perform the blocking op type
2169c5618e7fSantirez * specific cleanup. We just remove the client from the list of clients
2170c5618e7fSantirez * waiting for replica acks. Never call it directly, call unblockClient()
2171c5618e7fSantirez * instead. */
unblockClientWaitingReplicas(client * c)2172554bd0e7Santirez void unblockClientWaitingReplicas(client *c) {
2173c5618e7fSantirez listNode *ln = listSearchKey(server.clients_waiting_acks,c);
21742d9e3eb1Santirez serverAssert(ln != NULL);
2175c5618e7fSantirez listDelNode(server.clients_waiting_acks,ln);
2176c5618e7fSantirez }
2177c5618e7fSantirez
2178c5618e7fSantirez /* Check if there are clients blocked in WAIT that can be unblocked since
2179c5618e7fSantirez * we received enough ACKs from slaves. */
processClientsWaitingReplicas(void)2180c5618e7fSantirez void processClientsWaitingReplicas(void) {
2181c5618e7fSantirez long long last_offset = 0;
2182c5618e7fSantirez int last_numreplicas = 0;
2183c5618e7fSantirez
2184c5618e7fSantirez listIter li;
2185c5618e7fSantirez listNode *ln;
2186c5618e7fSantirez
2187c5618e7fSantirez listRewind(server.clients_waiting_acks,&li);
2188c5618e7fSantirez while((ln = listNext(&li))) {
2189554bd0e7Santirez client *c = ln->value;
2190c5618e7fSantirez
2191c5618e7fSantirez /* Every time we find a client that is satisfied for a given
2192c5618e7fSantirez * offset and number of replicas, we remember it so the next client
2193c5618e7fSantirez * may be unblocked without calling replicationCountAcksByOffset()
2194c5618e7fSantirez * if the requested offset / replicas were equal or less. */
2195c5618e7fSantirez if (last_offset && last_offset > c->bpop.reploffset &&
2196c5618e7fSantirez last_numreplicas > c->bpop.numreplicas)
2197c5618e7fSantirez {
2198c5618e7fSantirez unblockClient(c);
2199c5618e7fSantirez addReplyLongLong(c,last_numreplicas);
2200c5618e7fSantirez } else {
2201c5618e7fSantirez int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
2202c5618e7fSantirez
2203c5618e7fSantirez if (numreplicas >= c->bpop.numreplicas) {
2204c5618e7fSantirez last_offset = c->bpop.reploffset;
2205c5618e7fSantirez last_numreplicas = numreplicas;
2206c5618e7fSantirez unblockClient(c);
2207c5618e7fSantirez addReplyLongLong(c,numreplicas);
2208c5618e7fSantirez }
2209c5618e7fSantirez }
2210c5618e7fSantirez }
2211c5618e7fSantirez }
2212c5618e7fSantirez
22136f540320Santirez /* Return the slave replication offset for this instance, that is
22146f540320Santirez * the offset for which we already processed the master replication stream. */
replicationGetSlaveOffset(void)22156f540320Santirez long long replicationGetSlaveOffset(void) {
22166f540320Santirez long long offset = 0;
22176f540320Santirez
22186f540320Santirez if (server.masterhost != NULL) {
22196f540320Santirez if (server.master) {
22206f540320Santirez offset = server.master->reploff;
22216f540320Santirez } else if (server.cached_master) {
22226f540320Santirez offset = server.cached_master->reploff;
22236f540320Santirez }
22246f540320Santirez }
22256f540320Santirez /* offset may be -1 when the master does not support it at all, however
22266f540320Santirez * this function is designed to return an offset that can express the
22276f540320Santirez * amount of data processed by the master, so we return a positive
22286f540320Santirez * integer. */
22296f540320Santirez if (offset < 0) offset = 0;
22306f540320Santirez return offset;
22316f540320Santirez }
22326f540320Santirez
2233c5618e7fSantirez /* --------------------------- REPLICATION CRON ---------------------------- */
2234f4aa600bSantirez
22353a82b8acSAaron Rutkovsky /* Replication cron function, called 1 time per second. */
replicationCron(void)2236f4aa600bSantirez void replicationCron(void) {
223755ba7727Santirez static long long replication_cron_loops = 0;
223855ba7727Santirez
223927acd7aaSantirez /* Non blocking connection timeout? */
2240bb66fc31Santirez if (server.masterhost &&
224132f80e2fSantirez (server.repl_state == REPL_STATE_CONNECTING ||
2242e3344b80Santirez slaveIsInHandshakeState()) &&
224327acd7aaSantirez (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
224427acd7aaSantirez {
224532f80e2fSantirez serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
2246c5f8c80aSantirez cancelReplicationHandshake();
224727acd7aaSantirez }
224827acd7aaSantirez
2249f4aa600bSantirez /* Bulk transfer I/O timeout? */
225032f80e2fSantirez if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
22518996bf77Santirez (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2252f4aa600bSantirez {
225332f80e2fSantirez serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
2254c5f8c80aSantirez cancelReplicationHandshake();
2255f4aa600bSantirez }
2256f4aa600bSantirez
225789a1433eSantirez /* Timed out master when we are an already connected slave? */
225832f80e2fSantirez if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
22598996bf77Santirez (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
226089a1433eSantirez {
226132f80e2fSantirez serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
226289a1433eSantirez freeClient(server.master);
226389a1433eSantirez }
226489a1433eSantirez
2265f4aa600bSantirez /* Check if we should connect to a MASTER */
226632f80e2fSantirez if (server.repl_state == REPL_STATE_CONNECT) {
226732f80e2fSantirez serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
2268b2f83439Santirez server.masterhost, server.masterport);
226940eb548aSantirez if (connectWithMaster() == C_OK) {
227032f80e2fSantirez serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
2271f4aa600bSantirez }
2272f4aa600bSantirez }
2273e06a5604Santirez
227490a81b4eSantirez /* Send ACK to master from time to time.
227590a81b4eSantirez * Note that we do not send periodic acks to masters that don't
227690a81b4eSantirez * support PSYNC and replication offsets. */
227790a81b4eSantirez if (server.masterhost && server.master &&
227832f80e2fSantirez !(server.master->flags & CLIENT_PRE_PSYNC))
2279e06a5604Santirez replicationSendAck();
228089a1433eSantirez
228189a1433eSantirez /* If we have attached slaves, PING them from time to time.
228289a1433eSantirez * So slaves can implement an explicit timeout to masters, and will
228389a1433eSantirez * be able to detect a link disconnection even if the TCP connection
228489a1433eSantirez * will not actually go down. */
228589a1433eSantirez listIter li;
228689a1433eSantirez listNode *ln;
22874b83ad4eSantirez robj *ping_argv[1];
228889a1433eSantirez
228955ba7727Santirez /* First, send PING according to ping_slave_period. */
229055ba7727Santirez if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
22914b83ad4eSantirez ping_argv[0] = createStringObject("PING",4);
229255ba7727Santirez replicationFeedSlaves(server.slaves, server.slaveseldb,
229355ba7727Santirez ping_argv, 1);
22944b83ad4eSantirez decrRefCount(ping_argv[0]);
229555ba7727Santirez }
22964b83ad4eSantirez
229707888202Santirez /* Second, send a newline to all the slaves in pre-synchronization
229807888202Santirez * stage, that is, slaves waiting for the master to create the RDB file.
22994b83ad4eSantirez * The newline will be ignored by the slave but will refresh the
230055ba7727Santirez * last-io timer preventing a timeout. In this case we ignore the
230155ba7727Santirez * ping period and refresh the connection once per second since certain
230255ba7727Santirez * timeouts are set at a few seconds (example: PSYNC response). */
230389a1433eSantirez listRewind(server.slaves,&li);
230489a1433eSantirez while((ln = listNext(&li))) {
2305554bd0e7Santirez client *slave = ln->value;
230689a1433eSantirez
230732f80e2fSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
230832f80e2fSantirez (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
230932f80e2fSantirez server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
23104b16263bSantirez {
2311f96a9f82Santirez if (write(slave->fd, "\n", 1) == -1) {
2312f96a9f82Santirez /* Don't worry, it's just a ping. */
2313f96a9f82Santirez }
231489a1433eSantirez }
231589a1433eSantirez }
231607888202Santirez
23173c82c85fSantirez /* Disconnect timedout slaves. */
23183c82c85fSantirez if (listLength(server.slaves)) {
23193c82c85fSantirez listIter li;
23203c82c85fSantirez listNode *ln;
23213c82c85fSantirez
23223c82c85fSantirez listRewind(server.slaves,&li);
23233c82c85fSantirez while((ln = listNext(&li))) {
2324554bd0e7Santirez client *slave = ln->value;
23253c82c85fSantirez
232632f80e2fSantirez if (slave->replstate != SLAVE_STATE_ONLINE) continue;
232732f80e2fSantirez if (slave->flags & CLIENT_PRE_PSYNC) continue;
23283c82c85fSantirez if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
23293c82c85fSantirez {
233032f80e2fSantirez serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
23318a416ca4Santirez replicationGetSlaveName(slave));
2332f5c6ebbfSantirez freeClient(slave);
23333c82c85fSantirez }
23343c82c85fSantirez }
23353c82c85fSantirez }
23363c82c85fSantirez
233707888202Santirez /* If we have no attached slaves and there is a replication backlog
233807888202Santirez * using memory, free it after some (configured) time. */
233907888202Santirez if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
234007888202Santirez server.repl_backlog)
234107888202Santirez {
234207888202Santirez time_t idle = server.unixtime - server.repl_no_slaves_since;
234307888202Santirez
234407888202Santirez if (idle > server.repl_backlog_time_limit) {
234507888202Santirez freeReplicationBacklog();
234632f80e2fSantirez serverLog(LL_NOTICE,
234707888202Santirez "Replication backlog freed after %d seconds "
2348f9b5ca29Santirez "without connected slaves.",
2349f9b5ca29Santirez (int) server.repl_backlog_time_limit);
235007888202Santirez }
235107888202Santirez }
2352ed599d3aSantirez
2353f0bf5fd8Santirez /* If AOF is disabled and we no longer have attached slaves, we can
2354f0bf5fd8Santirez * free our Replication Script Cache as there is no need to propagate
2355f0bf5fd8Santirez * EVALSHA at all. */
2356f0bf5fd8Santirez if (listLength(server.slaves) == 0 &&
235732f80e2fSantirez server.aof_state == AOF_OFF &&
2358f0bf5fd8Santirez listLength(server.repl_scriptcache_fifo) != 0)
2359f0bf5fd8Santirez {
2360f0bf5fd8Santirez replicationScriptCacheFlush();
2361f0bf5fd8Santirez }
2362f0bf5fd8Santirez
2363017378ecSantirez /* Start a BGSAVE good for replication if we have slaves in
2364017378ecSantirez * WAIT_BGSAVE_START state.
236542951ab3Santirez *
2366017378ecSantirez * In case of diskless replication, we make sure to wait the specified
2367017378ecSantirez * number of seconds (according to configuration) so that other slaves
2368017378ecSantirez * have the time to arrive before we start streaming. */
236942951ab3Santirez if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2370e9e00755Santirez time_t idle, max_idle = 0;
2371e9e00755Santirez int slaves_waiting = 0;
23723e6d4d59Santirez int mincapa = -1;
2373e9e00755Santirez listNode *ln;
2374e9e00755Santirez listIter li;
2375e9e00755Santirez
2376e9e00755Santirez listRewind(server.slaves,&li);
2377e9e00755Santirez while((ln = listNext(&li))) {
2378554bd0e7Santirez client *slave = ln->value;
237932f80e2fSantirez if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2380e9e00755Santirez idle = server.unixtime - slave->lastinteraction;
2381e9e00755Santirez if (idle > max_idle) max_idle = idle;
2382e9e00755Santirez slaves_waiting++;
23833e6d4d59Santirez mincapa = (mincapa == -1) ? slave->slave_capa :
23843e6d4d59Santirez (mincapa & slave->slave_capa);
2385e9e00755Santirez }
2386e9e00755Santirez }
2387e9e00755Santirez
2388017378ecSantirez if (slaves_waiting &&
2389017378ecSantirez (!server.repl_diskless_sync ||
2390017378ecSantirez max_idle > server.repl_diskless_sync_delay))
2391017378ecSantirez {
2392017378ecSantirez /* Start the BGSAVE. The called function may start a
2393017378ecSantirez * BGSAVE with socket target or disk target depending on the
2394017378ecSantirez * configuration and slaves capabilities. */
2395f18e5b63Santirez startBgsaveForReplication(mincapa);
2396e9e00755Santirez }
2397e9e00755Santirez }
2398e9e00755Santirez
2399ed599d3aSantirez /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2400ed599d3aSantirez refreshGoodSlavesCount();
240155ba7727Santirez replication_cron_loops++; /* Incremented with frequency 1 HZ. */
2402f4aa600bSantirez }
2403