xref: /redis-3.2.3/src/cluster.c (revision f592b4d3)
1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/socket.h>
41 #include <sys/stat.h>
42 #include <sys/file.h>
43 #include <math.h>
44 
45 /* A global reference to myself is handy to make code more clear.
46  * Myself always points to server.cluster->myself, that is, the clusterNode
47  * that represents this node. */
48 clusterNode *myself = NULL;
49 
50 clusterNode *createClusterNode(char *nodename, int flags);
51 int clusterAddNode(clusterNode *node);
52 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
54 void clusterSendPing(clusterLink *link, int type);
55 void clusterSendFail(char *nodename);
56 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
57 void clusterUpdateState(void);
58 int clusterNodeGetSlotBit(clusterNode *n, int slot);
59 sds clusterGenNodesDescription(int filter);
60 clusterNode *clusterLookupNode(char *name);
61 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
62 int clusterAddSlot(clusterNode *n, int slot);
63 int clusterDelSlot(int slot);
64 int clusterDelNodeSlots(clusterNode *node);
65 int clusterNodeSetSlotBit(clusterNode *n, int slot);
66 void clusterSetMaster(clusterNode *n);
67 void clusterHandleSlaveFailover(void);
68 void clusterHandleSlaveMigration(int max_slaves);
69 int bitmapTestBit(unsigned char *bitmap, int pos);
70 void clusterDoBeforeSleep(int flags);
71 void clusterSendUpdate(clusterLink *link, clusterNode *node);
72 void resetManualFailover(void);
73 void clusterCloseAllSlots(void);
74 void clusterSetNodeAsMaster(clusterNode *n);
75 void clusterDelNode(clusterNode *delnode);
76 sds representClusterNodeFlags(sds ci, uint16_t flags);
77 uint64_t clusterGetMaxEpoch(void);
78 int clusterBumpConfigEpochWithoutConsensus(void);
79 
80 /* -----------------------------------------------------------------------------
81  * Initialization
82  * -------------------------------------------------------------------------- */
83 
84 /* Load the cluster config from 'filename'.
85  *
86  * If the file does not exist or is zero-length (this may happen because
87  * when we lock the nodes.conf file, we create a zero-length one for the
88  * sake of locking if it does not already exist), C_ERR is returned.
89  * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91     FILE *fp = fopen(filename,"r");
92     struct stat sb;
93     char *line;
94     int maxline, j;
95 
96     if (fp == NULL) {
97         if (errno == ENOENT) {
98             return C_ERR;
99         } else {
100             serverLog(LL_WARNING,
101                 "Loading the cluster node config from %s: %s",
102                 filename, strerror(errno));
103             exit(1);
104         }
105     }
106 
107     /* Check if the file is zero-length: if so return C_ERR to signal
108      * we have to write the config. */
109     if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110         fclose(fp);
111         return C_ERR;
112     }
113 
114     /* Parse the file. Note that single lines of the cluster config file can
115      * be really long as they include all the hash slots of the node.
116      * This means in the worst possible case, half of the Redis slots will be
117      * present in a single line, possibly in importing or migrating state, so
118      * together with the node ID of the sender/receiver.
119      *
120      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121     maxline = 1024+CLUSTER_SLOTS*128;
122     line = zmalloc(maxline);
123     while(fgets(line,maxline,fp) != NULL) {
124         int argc;
125         sds *argv;
126         clusterNode *n, *master;
127         char *p, *s;
128 
129         /* Skip blank lines, they can be created either by users manually
130          * editing nodes.conf or by the config writing process if stopped
131          * before the truncate() call. */
132         if (line[0] == '\n') continue;
133 
134         /* Split the line into arguments for processing. */
135         argv = sdssplitargs(line,&argc);
136         if (argv == NULL) goto fmterr;
137 
138         /* Handle the special "vars" line. Don't pretend it is the last
139          * line even if it actually is when generated by Redis. */
140         if (strcasecmp(argv[0],"vars") == 0) {
141             for (j = 1; j < argc; j += 2) {
142                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
143                     server.cluster->currentEpoch =
144                             strtoull(argv[j+1],NULL,10);
145                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
146                     server.cluster->lastVoteEpoch =
147                             strtoull(argv[j+1],NULL,10);
148                 } else {
149                     serverLog(LL_WARNING,
150                         "Skipping unknown cluster config variable '%s'",
151                         argv[j]);
152                 }
153             }
154             sdsfreesplitres(argv,argc);
155             continue;
156         }
157 
158         /* Regular config lines have at least eight fields */
159         if (argc < 8) goto fmterr;
160 
161         /* Create this node if it does not exist */
162         n = clusterLookupNode(argv[0]);
163         if (!n) {
164             n = createClusterNode(argv[0],0);
165             clusterAddNode(n);
166         }
167         /* Address and port */
168         if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
169         *p = '\0';
170         memcpy(n->ip,argv[1],strlen(argv[1])+1);
171         n->port = atoi(p+1);
172 
173         /* Parse flags */
174         p = s = argv[2];
175         while(p) {
176             p = strchr(s,',');
177             if (p) *p = '\0';
178             if (!strcasecmp(s,"myself")) {
179                 serverAssert(server.cluster->myself == NULL);
180                 myself = server.cluster->myself = n;
181                 n->flags |= CLUSTER_NODE_MYSELF;
182             } else if (!strcasecmp(s,"master")) {
183                 n->flags |= CLUSTER_NODE_MASTER;
184             } else if (!strcasecmp(s,"slave")) {
185                 n->flags |= CLUSTER_NODE_SLAVE;
186             } else if (!strcasecmp(s,"fail?")) {
187                 n->flags |= CLUSTER_NODE_PFAIL;
188             } else if (!strcasecmp(s,"fail")) {
189                 n->flags |= CLUSTER_NODE_FAIL;
190                 n->fail_time = mstime();
191             } else if (!strcasecmp(s,"handshake")) {
192                 n->flags |= CLUSTER_NODE_HANDSHAKE;
193             } else if (!strcasecmp(s,"noaddr")) {
194                 n->flags |= CLUSTER_NODE_NOADDR;
195             } else if (!strcasecmp(s,"noflags")) {
196                 /* nothing to do */
197             } else {
198                 serverPanic("Unknown flag in redis cluster config file");
199             }
200             if (p) s = p+1;
201         }
202 
203         /* Get master if any. Set the master and populate master's
204          * slave list. */
205         if (argv[3][0] != '-') {
206             master = clusterLookupNode(argv[3]);
207             if (!master) {
208                 master = createClusterNode(argv[3],0);
209                 clusterAddNode(master);
210             }
211             n->slaveof = master;
212             clusterNodeAddSlave(master,n);
213         }
214 
215         /* Set ping sent / pong received timestamps */
216         if (atoi(argv[4])) n->ping_sent = mstime();
217         if (atoi(argv[5])) n->pong_received = mstime();
218 
219         /* Set configEpoch for this node. */
220         n->configEpoch = strtoull(argv[6],NULL,10);
221 
222         /* Populate hash slots served by this instance. */
223         for (j = 8; j < argc; j++) {
224             int start, stop;
225 
226             if (argv[j][0] == '[') {
227                 /* Here we handle migrating / importing slots */
228                 int slot;
229                 char direction;
230                 clusterNode *cn;
231 
232                 p = strchr(argv[j],'-');
233                 serverAssert(p != NULL);
234                 *p = '\0';
235                 direction = p[1]; /* Either '>' or '<' */
236                 slot = atoi(argv[j]+1);
237                 p += 3;
238                 cn = clusterLookupNode(p);
239                 if (!cn) {
240                     cn = createClusterNode(p,0);
241                     clusterAddNode(cn);
242                 }
243                 if (direction == '>') {
244                     server.cluster->migrating_slots_to[slot] = cn;
245                 } else {
246                     server.cluster->importing_slots_from[slot] = cn;
247                 }
248                 continue;
249             } else if ((p = strchr(argv[j],'-')) != NULL) {
250                 *p = '\0';
251                 start = atoi(argv[j]);
252                 stop = atoi(p+1);
253             } else {
254                 start = stop = atoi(argv[j]);
255             }
256             while(start <= stop) clusterAddSlot(n, start++);
257         }
258 
259         sdsfreesplitres(argv,argc);
260     }
261     /* Config sanity check */
262     if (server.cluster->myself == NULL) goto fmterr;
263 
264     zfree(line);
265     fclose(fp);
266 
267     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
268 
269     /* Something that should never happen: currentEpoch smaller than
270      * the max epoch found in the nodes configuration. However we handle this
271      * as some form of protection against manual editing of critical files. */
272     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
273         server.cluster->currentEpoch = clusterGetMaxEpoch();
274     }
275     return C_OK;
276 
277 fmterr:
278     serverLog(LL_WARNING,
279         "Unrecoverable error: corrupted cluster config file.");
280     zfree(line);
281     if (fp) fclose(fp);
282     exit(1);
283 }
284 
285 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
286  *
287  * This function writes the node config and returns 0, on error -1
288  * is returned.
289  *
290  * Note: we need to write the file in an atomic way from the point of view
291  * of the POSIX filesystem semantics, so that if the server is stopped
292  * or crashes during the write, we'll end with either the old file or the
293  * new one. Since we have the full payload to write available we can use
294  * a single write to write the whole file. If the pre-existing file was
295  * bigger we pad our payload with newlines that are anyway ignored and truncate
296  * the file afterward. */
clusterSaveConfig(int do_fsync)297 int clusterSaveConfig(int do_fsync) {
298     sds ci;
299     size_t content_size;
300     struct stat sb;
301     int fd;
302 
303     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
304 
305     /* Get the nodes description and concatenate our "vars" directive to
306      * save currentEpoch and lastVoteEpoch. */
307     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
308     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
309         (unsigned long long) server.cluster->currentEpoch,
310         (unsigned long long) server.cluster->lastVoteEpoch);
311     content_size = sdslen(ci);
312 
313     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
314         == -1) goto err;
315 
316     /* Pad the new payload if the existing file length is greater. */
317     if (fstat(fd,&sb) != -1) {
318         if (sb.st_size > (off_t)content_size) {
319             ci = sdsgrowzero(ci,sb.st_size);
320             memset(ci+content_size,'\n',sb.st_size-content_size);
321         }
322     }
323     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
324     if (do_fsync) {
325         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
326         fsync(fd);
327     }
328 
329     /* Truncate the file if needed to remove the final \n padding that
330      * is just garbage. */
331     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
332         /* ftruncate() failing is not a critical error. */
333     }
334     close(fd);
335     sdsfree(ci);
336     return 0;
337 
338 err:
339     if (fd != -1) close(fd);
340     sdsfree(ci);
341     return -1;
342 }
343 
clusterSaveConfigOrDie(int do_fsync)344 void clusterSaveConfigOrDie(int do_fsync) {
345     if (clusterSaveConfig(do_fsync) == -1) {
346         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
347         exit(1);
348     }
349 }
350 
351 /* Lock the cluster config using flock(), and leaks the file descritor used to
352  * acquire the lock so that the file will be locked forever.
353  *
354  * This works because we always update nodes.conf with a new version
355  * in-place, reopening the file, and writing to it in place (later adjusting
356  * the length with ftruncate()).
357  *
358  * On success C_OK is returned, otherwise an error is logged and
359  * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)360 int clusterLockConfig(char *filename) {
361 /* flock() does not exist on Solaris
362  * and a fcntl-based solution won't help, as we constantly re-open that file,
363  * which will release _all_ locks anyway
364  */
365 #if !defined(__sun)
366     /* To lock it, we need to open the file in a way it is created if
367      * it does not exist, otherwise there is a race condition with other
368      * processes. */
369     int fd = open(filename,O_WRONLY|O_CREAT,0644);
370     if (fd == -1) {
371         serverLog(LL_WARNING,
372             "Can't open %s in order to acquire a lock: %s",
373             filename, strerror(errno));
374         return C_ERR;
375     }
376 
377     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
378         if (errno == EWOULDBLOCK) {
379             serverLog(LL_WARNING,
380                  "Sorry, the cluster configuration file %s is already used "
381                  "by a different Redis Cluster node. Please make sure that "
382                  "different nodes use different cluster configuration "
383                  "files.", filename);
384         } else {
385             serverLog(LL_WARNING,
386                 "Impossible to lock %s: %s", filename, strerror(errno));
387         }
388         close(fd);
389         return C_ERR;
390     }
391     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
392      * lock to the file as long as the process exists. */
393 #endif /* __sun */
394 
395     return C_OK;
396 }
397 
clusterInit(void)398 void clusterInit(void) {
399     int saveconf = 0;
400 
401     server.cluster = zmalloc(sizeof(clusterState));
402     server.cluster->myself = NULL;
403     server.cluster->currentEpoch = 0;
404     server.cluster->state = CLUSTER_FAIL;
405     server.cluster->size = 1;
406     server.cluster->todo_before_sleep = 0;
407     server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
408     server.cluster->nodes_black_list =
409         dictCreate(&clusterNodesBlackListDictType,NULL);
410     server.cluster->failover_auth_time = 0;
411     server.cluster->failover_auth_count = 0;
412     server.cluster->failover_auth_rank = 0;
413     server.cluster->failover_auth_epoch = 0;
414     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
415     server.cluster->lastVoteEpoch = 0;
416     server.cluster->stats_bus_messages_sent = 0;
417     server.cluster->stats_bus_messages_received = 0;
418     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
419     clusterCloseAllSlots();
420 
421     /* Lock the cluster config file to make sure every node uses
422      * its own nodes.conf. */
423     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
424         exit(1);
425 
426     /* Load or create a new nodes configuration. */
427     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
428         /* No configuration found. We will just use the random name provided
429          * by the createClusterNode() function. */
430         myself = server.cluster->myself =
431             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
432         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
433             myself->name);
434         clusterAddNode(myself);
435         saveconf = 1;
436     }
437     if (saveconf) clusterSaveConfigOrDie(1);
438 
439     /* We need a listening TCP port for our cluster messaging needs. */
440     server.cfd_count = 0;
441 
442     /* Port sanity check II
443      * The other handshake port check is triggered too late to stop
444      * us from trying to use a too-high cluster port number. */
445     if (server.port > (65535-CLUSTER_PORT_INCR)) {
446         serverLog(LL_WARNING, "Redis port number too high. "
447                    "Cluster communication port is 10,000 port "
448                    "numbers higher than your Redis port. "
449                    "Your Redis port number must be "
450                    "lower than 55535.");
451         exit(1);
452     }
453 
454     if (listenToPort(server.port+CLUSTER_PORT_INCR,
455         server.cfd,&server.cfd_count) == C_ERR)
456     {
457         exit(1);
458     } else {
459         int j;
460 
461         for (j = 0; j < server.cfd_count; j++) {
462             if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
463                 clusterAcceptHandler, NULL) == AE_ERR)
464                     serverPanic("Unrecoverable error creating Redis Cluster "
465                                 "file event.");
466         }
467     }
468 
469     /* The slots -> keys map is a sorted set. Init it. */
470     server.cluster->slots_to_keys = zslCreate();
471 
472     /* Set myself->port to my listening port, we'll just need to discover
473      * the IP address via MEET messages. */
474     myself->port = server.port;
475 
476     server.cluster->mf_end = 0;
477     resetManualFailover();
478 }
479 
480 /* Reset a node performing a soft or hard reset:
481  *
482  * 1) All other nodes are forget.
483  * 2) All the assigned / open slots are released.
484  * 3) If the node is a slave, it turns into a master.
485  * 5) Only for hard reset: a new Node ID is generated.
486  * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
487  * 7) The new configuration is saved and the cluster state updated.
488  * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)489 void clusterReset(int hard) {
490     dictIterator *di;
491     dictEntry *de;
492     int j;
493 
494     /* Turn into master. */
495     if (nodeIsSlave(myself)) {
496         clusterSetNodeAsMaster(myself);
497         replicationUnsetMaster();
498         emptyDb(NULL);
499     }
500 
501     /* Close slots, reset manual failover state. */
502     clusterCloseAllSlots();
503     resetManualFailover();
504 
505     /* Unassign all the slots. */
506     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
507 
508     /* Forget all the nodes, but myself. */
509     di = dictGetSafeIterator(server.cluster->nodes);
510     while((de = dictNext(di)) != NULL) {
511         clusterNode *node = dictGetVal(de);
512 
513         if (node == myself) continue;
514         clusterDelNode(node);
515     }
516     dictReleaseIterator(di);
517 
518     /* Hard reset only: set epochs to 0, change node ID. */
519     if (hard) {
520         sds oldname;
521 
522         server.cluster->currentEpoch = 0;
523         server.cluster->lastVoteEpoch = 0;
524         myself->configEpoch = 0;
525         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
526 
527         /* To change the Node ID we need to remove the old name from the
528          * nodes table, change the ID, and re-add back with new name. */
529         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
530         dictDelete(server.cluster->nodes,oldname);
531         sdsfree(oldname);
532         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
533         clusterAddNode(myself);
534         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
535     }
536 
537     /* Make sure to persist the new config and update the state. */
538     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
539                          CLUSTER_TODO_UPDATE_STATE|
540                          CLUSTER_TODO_FSYNC_CONFIG);
541 }
542 
543 /* -----------------------------------------------------------------------------
544  * CLUSTER communication link
545  * -------------------------------------------------------------------------- */
546 
createClusterLink(clusterNode * node)547 clusterLink *createClusterLink(clusterNode *node) {
548     clusterLink *link = zmalloc(sizeof(*link));
549     link->ctime = mstime();
550     link->sndbuf = sdsempty();
551     link->rcvbuf = sdsempty();
552     link->node = node;
553     link->fd = -1;
554     return link;
555 }
556 
557 /* Free a cluster link, but does not free the associated node of course.
558  * This function will just make sure that the original node associated
559  * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)560 void freeClusterLink(clusterLink *link) {
561     if (link->fd != -1) {
562         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
563         aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
564     }
565     sdsfree(link->sndbuf);
566     sdsfree(link->rcvbuf);
567     if (link->node)
568         link->node->link = NULL;
569     close(link->fd);
570     zfree(link);
571 }
572 
573 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)574 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
575     int cport, cfd;
576     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
577     char cip[NET_IP_STR_LEN];
578     clusterLink *link;
579     UNUSED(el);
580     UNUSED(mask);
581     UNUSED(privdata);
582 
583     /* If the server is starting up, don't accept cluster connections:
584      * UPDATE messages may interact with the database content. */
585     if (server.masterhost == NULL && server.loading) return;
586 
587     while(max--) {
588         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
589         if (cfd == ANET_ERR) {
590             if (errno != EWOULDBLOCK)
591                 serverLog(LL_VERBOSE,
592                     "Error accepting cluster node: %s", server.neterr);
593             return;
594         }
595         anetNonBlock(NULL,cfd);
596         anetEnableTcpNoDelay(NULL,cfd);
597 
598         /* Use non-blocking I/O for cluster messages. */
599         serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
600         /* Create a link object we use to handle the connection.
601          * It gets passed to the readable handler when data is available.
602          * Initiallly the link->node pointer is set to NULL as we don't know
603          * which node is, but the right node is references once we know the
604          * node identity. */
605         link = createClusterLink(NULL);
606         link->fd = cfd;
607         aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
608     }
609 }
610 
611 /* -----------------------------------------------------------------------------
612  * Key space handling
613  * -------------------------------------------------------------------------- */
614 
615 /* We have 16384 hash slots. The hash slot of a given key is obtained
616  * as the least significant 14 bits of the crc16 of the key.
617  *
618  * However if the key contains the {...} pattern, only the part between
619  * { and } is hashed. This may be useful in the future to force certain
620  * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)621 unsigned int keyHashSlot(char *key, int keylen) {
622     int s, e; /* start-end indexes of { and } */
623 
624     for (s = 0; s < keylen; s++)
625         if (key[s] == '{') break;
626 
627     /* No '{' ? Hash the whole key. This is the base case. */
628     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
629 
630     /* '{' found? Check if we have the corresponding '}'. */
631     for (e = s+1; e < keylen; e++)
632         if (key[e] == '}') break;
633 
634     /* No '}' or nothing betweeen {} ? Hash the whole key. */
635     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
636 
637     /* If we are here there is both a { and a } on its right. Hash
638      * what is in the middle between { and }. */
639     return crc16(key+s+1,e-s-1) & 0x3FFF;
640 }
641 
642 /* -----------------------------------------------------------------------------
643  * CLUSTER node API
644  * -------------------------------------------------------------------------- */
645 
646 /* Create a new cluster node, with the specified flags.
647  * If "nodename" is NULL this is considered a first handshake and a random
648  * node name is assigned to this node (it will be fixed later when we'll
649  * receive the first pong).
650  *
651  * The node is created and returned to the user, but it is not automatically
652  * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)653 clusterNode *createClusterNode(char *nodename, int flags) {
654     clusterNode *node = zmalloc(sizeof(*node));
655 
656     if (nodename)
657         memcpy(node->name, nodename, CLUSTER_NAMELEN);
658     else
659         getRandomHexChars(node->name, CLUSTER_NAMELEN);
660     node->ctime = mstime();
661     node->configEpoch = 0;
662     node->flags = flags;
663     memset(node->slots,0,sizeof(node->slots));
664     node->numslots = 0;
665     node->numslaves = 0;
666     node->slaves = NULL;
667     node->slaveof = NULL;
668     node->ping_sent = node->pong_received = 0;
669     node->fail_time = 0;
670     node->link = NULL;
671     memset(node->ip,0,sizeof(node->ip));
672     node->port = 0;
673     node->fail_reports = listCreate();
674     node->voted_time = 0;
675     node->orphaned_time = 0;
676     node->repl_offset_time = 0;
677     node->repl_offset = 0;
678     listSetFreeMethod(node->fail_reports,zfree);
679     return node;
680 }
681 
682 /* This function is called every time we get a failure report from a node.
683  * The side effect is to populate the fail_reports list (or to update
684  * the timestamp of an existing report).
685  *
686  * 'failing' is the node that is in failure state according to the
687  * 'sender' node.
688  *
689  * The function returns 0 if it just updates a timestamp of an existing
690  * failure report from the same sender. 1 is returned if a new failure
691  * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)692 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
693     list *l = failing->fail_reports;
694     listNode *ln;
695     listIter li;
696     clusterNodeFailReport *fr;
697 
698     /* If a failure report from the same sender already exists, just update
699      * the timestamp. */
700     listRewind(l,&li);
701     while ((ln = listNext(&li)) != NULL) {
702         fr = ln->value;
703         if (fr->node == sender) {
704             fr->time = mstime();
705             return 0;
706         }
707     }
708 
709     /* Otherwise create a new report. */
710     fr = zmalloc(sizeof(*fr));
711     fr->node = sender;
712     fr->time = mstime();
713     listAddNodeTail(l,fr);
714     return 1;
715 }
716 
717 /* Remove failure reports that are too old, where too old means reasonably
718  * older than the global node timeout. Note that anyway for a node to be
719  * flagged as FAIL we need to have a local PFAIL state that is at least
720  * older than the global node timeout, so we don't just trust the number
721  * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)722 void clusterNodeCleanupFailureReports(clusterNode *node) {
723     list *l = node->fail_reports;
724     listNode *ln;
725     listIter li;
726     clusterNodeFailReport *fr;
727     mstime_t maxtime = server.cluster_node_timeout *
728                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
729     mstime_t now = mstime();
730 
731     listRewind(l,&li);
732     while ((ln = listNext(&li)) != NULL) {
733         fr = ln->value;
734         if (now - fr->time > maxtime) listDelNode(l,ln);
735     }
736 }
737 
738 /* Remove the failing report for 'node' if it was previously considered
739  * failing by 'sender'. This function is called when a node informs us via
740  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
741  *
742  * Note that this function is called relatively often as it gets called even
743  * when there are no nodes failing, and is O(N), however when the cluster is
744  * fine the failure reports list is empty so the function runs in constant
745  * time.
746  *
747  * The function returns 1 if the failure report was found and removed.
748  * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)749 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
750     list *l = node->fail_reports;
751     listNode *ln;
752     listIter li;
753     clusterNodeFailReport *fr;
754 
755     /* Search for a failure report from this sender. */
756     listRewind(l,&li);
757     while ((ln = listNext(&li)) != NULL) {
758         fr = ln->value;
759         if (fr->node == sender) break;
760     }
761     if (!ln) return 0; /* No failure report from this sender. */
762 
763     /* Remove the failure report. */
764     listDelNode(l,ln);
765     clusterNodeCleanupFailureReports(node);
766     return 1;
767 }
768 
769 /* Return the number of external nodes that believe 'node' is failing,
770  * not including this node, that may have a PFAIL or FAIL state for this
771  * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)772 int clusterNodeFailureReportsCount(clusterNode *node) {
773     clusterNodeCleanupFailureReports(node);
774     return listLength(node->fail_reports);
775 }
776 
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)777 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
778     int j;
779 
780     for (j = 0; j < master->numslaves; j++) {
781         if (master->slaves[j] == slave) {
782             if ((j+1) < master->numslaves) {
783                 int remaining_slaves = (master->numslaves - j) - 1;
784                 memmove(master->slaves+j,master->slaves+(j+1),
785                         (sizeof(*master->slaves) * remaining_slaves));
786             }
787             master->numslaves--;
788             if (master->numslaves == 0)
789                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
790             return C_OK;
791         }
792     }
793     return C_ERR;
794 }
795 
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)796 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
797     int j;
798 
799     /* If it's already a slave, don't add it again. */
800     for (j = 0; j < master->numslaves; j++)
801         if (master->slaves[j] == slave) return C_ERR;
802     master->slaves = zrealloc(master->slaves,
803         sizeof(clusterNode*)*(master->numslaves+1));
804     master->slaves[master->numslaves] = slave;
805     master->numslaves++;
806     master->flags |= CLUSTER_NODE_MIGRATE_TO;
807     return C_OK;
808 }
809 
clusterCountNonFailingSlaves(clusterNode * n)810 int clusterCountNonFailingSlaves(clusterNode *n) {
811     int j, okslaves = 0;
812 
813     for (j = 0; j < n->numslaves; j++)
814         if (!nodeFailed(n->slaves[j])) okslaves++;
815     return okslaves;
816 }
817 
818 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)819 void freeClusterNode(clusterNode *n) {
820     sds nodename;
821     int j;
822 
823     /* If the node has associated slaves, we have to set
824      * all the slaves->slaveof fields to NULL (unknown). */
825     for (j = 0; j < n->numslaves; j++)
826         n->slaves[j]->slaveof = NULL;
827 
828     /* Remove this node from the list of slaves of its master. */
829     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
830 
831     /* Unlink from the set of nodes. */
832     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
833     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
834     sdsfree(nodename);
835 
836     /* Release link and associated data structures. */
837     if (n->link) freeClusterLink(n->link);
838     listRelease(n->fail_reports);
839     zfree(n->slaves);
840     zfree(n);
841 }
842 
843 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)844 int clusterAddNode(clusterNode *node) {
845     int retval;
846 
847     retval = dictAdd(server.cluster->nodes,
848             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
849     return (retval == DICT_OK) ? C_OK : C_ERR;
850 }
851 
852 /* Remove a node from the cluster. The functio performs the high level
853  * cleanup, calling freeClusterNode() for the low level cleanup.
854  * Here we do the following:
855  *
856  * 1) Mark all the slots handled by it as unassigned.
857  * 2) Remove all the failure reports sent by this node and referenced by
858  *    other nodes.
859  * 3) Free the node with freeClusterNode() that will in turn remove it
860  *    from the hash table and from the list of slaves of its master, if
861  *    it is a slave node.
862  */
clusterDelNode(clusterNode * delnode)863 void clusterDelNode(clusterNode *delnode) {
864     int j;
865     dictIterator *di;
866     dictEntry *de;
867 
868     /* 1) Mark slots as unassigned. */
869     for (j = 0; j < CLUSTER_SLOTS; j++) {
870         if (server.cluster->importing_slots_from[j] == delnode)
871             server.cluster->importing_slots_from[j] = NULL;
872         if (server.cluster->migrating_slots_to[j] == delnode)
873             server.cluster->migrating_slots_to[j] = NULL;
874         if (server.cluster->slots[j] == delnode)
875             clusterDelSlot(j);
876     }
877 
878     /* 2) Remove failure reports. */
879     di = dictGetSafeIterator(server.cluster->nodes);
880     while((de = dictNext(di)) != NULL) {
881         clusterNode *node = dictGetVal(de);
882 
883         if (node == delnode) continue;
884         clusterNodeDelFailureReport(node,delnode);
885     }
886     dictReleaseIterator(di);
887 
888     /* 3) Free the node, unlinking it from the cluster. */
889     freeClusterNode(delnode);
890 }
891 
892 /* Node lookup by name */
clusterLookupNode(char * name)893 clusterNode *clusterLookupNode(char *name) {
894     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
895     dictEntry *de;
896 
897     de = dictFind(server.cluster->nodes,s);
898     sdsfree(s);
899     if (de == NULL) return NULL;
900     return dictGetVal(de);
901 }
902 
903 /* This is only used after the handshake. When we connect a given IP/PORT
904  * as a result of CLUSTER MEET we don't have the node name yet, so we
905  * pick a random one, and will fix it when we receive the PONG request using
906  * this function. */
clusterRenameNode(clusterNode * node,char * newname)907 void clusterRenameNode(clusterNode *node, char *newname) {
908     int retval;
909     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
910 
911     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
912         node->name, newname);
913     retval = dictDelete(server.cluster->nodes, s);
914     sdsfree(s);
915     serverAssert(retval == DICT_OK);
916     memcpy(node->name, newname, CLUSTER_NAMELEN);
917     clusterAddNode(node);
918 }
919 
920 /* -----------------------------------------------------------------------------
921  * CLUSTER config epoch handling
922  * -------------------------------------------------------------------------- */
923 
924 /* Return the greatest configEpoch found in the cluster, or the current
925  * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)926 uint64_t clusterGetMaxEpoch(void) {
927     uint64_t max = 0;
928     dictIterator *di;
929     dictEntry *de;
930 
931     di = dictGetSafeIterator(server.cluster->nodes);
932     while((de = dictNext(di)) != NULL) {
933         clusterNode *node = dictGetVal(de);
934         if (node->configEpoch > max) max = node->configEpoch;
935     }
936     dictReleaseIterator(di);
937     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
938     return max;
939 }
940 
941 /* If this node epoch is zero or is not already the greatest across the
942  * cluster (from the POV of the local configuration), this function will:
943  *
944  * 1) Generate a new config epoch, incrementing the current epoch.
945  * 2) Assign the new epoch to this node, WITHOUT any consensus.
946  * 3) Persist the configuration on disk before sending packets with the
947  *    new configuration.
948  *
949  * If the new config epoch is generated and assigend, C_OK is returned,
950  * otherwise C_ERR is returned (since the node has already the greatest
951  * configuration around) and no operation is performed.
952  *
953  * Important note: this function violates the principle that config epochs
954  * should be generated with consensus and should be unique across the cluster.
955  * However Redis Cluster uses this auto-generated new config epochs in two
956  * cases:
957  *
958  * 1) When slots are closed after importing. Otherwise resharding would be
959  *    too expensive.
960  * 2) When CLUSTER FAILOVER is called with options that force a slave to
961  *    failover its master even if there is not master majority able to
962  *    create a new configuration epoch.
963  *
964  * Redis Cluster will not explode using this function, even in the case of
965  * a collision between this node and another node, generating the same
966  * configuration epoch unilaterally, because the config epoch conflict
967  * resolution algorithm will eventually move colliding nodes to different
968  * config epochs. However using this function may violate the "last failover
969  * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)970 int clusterBumpConfigEpochWithoutConsensus(void) {
971     uint64_t maxEpoch = clusterGetMaxEpoch();
972 
973     if (myself->configEpoch == 0 ||
974         myself->configEpoch != maxEpoch)
975     {
976         server.cluster->currentEpoch++;
977         myself->configEpoch = server.cluster->currentEpoch;
978         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
979                              CLUSTER_TODO_FSYNC_CONFIG);
980         serverLog(LL_WARNING,
981             "New configEpoch set to %llu",
982             (unsigned long long) myself->configEpoch);
983         return C_OK;
984     } else {
985         return C_ERR;
986     }
987 }
988 
989 /* This function is called when this node is a master, and we receive from
990  * another master a configuration epoch that is equal to our configuration
991  * epoch.
992  *
993  * BACKGROUND
994  *
995  * It is not possible that different slaves get the same config
996  * epoch during a failover election, because the slaves need to get voted
997  * by a majority. However when we perform a manual resharding of the cluster
998  * the node will assign a configuration epoch to itself without to ask
999  * for agreement. Usually resharding happens when the cluster is working well
1000  * and is supervised by the sysadmin, however it is possible for a failover
1001  * to happen exactly while the node we are resharding a slot to assigns itself
1002  * a new configuration epoch, but before it is able to propagate it.
1003  *
1004  * So technically it is possible in this condition that two nodes end with
1005  * the same configuration epoch.
1006  *
1007  * Another possibility is that there are bugs in the implementation causing
1008  * this to happen.
1009  *
1010  * Moreover when a new cluster is created, all the nodes start with the same
1011  * configEpoch. This collision resolution code allows nodes to automatically
1012  * end with a different configEpoch at startup automatically.
1013  *
1014  * In all the cases, we want a mechanism that resolves this issue automatically
1015  * as a safeguard. The same configuration epoch for masters serving different
1016  * set of slots is not harmful, but it is if the nodes end serving the same
1017  * slots for some reason (manual errors or software bugs) without a proper
1018  * failover procedure.
1019  *
1020  * In general we want a system that eventually always ends with different
1021  * masters having different configuration epochs whatever happened, since
1022  * nothign is worse than a split-brain condition in a distributed system.
1023  *
1024  * BEHAVIOR
1025  *
1026  * When this function gets called, what happens is that if this node
1027  * has the lexicographically smaller Node ID compared to the other node
1028  * with the conflicting epoch (the 'sender' node), it will assign itself
1029  * the greatest configuration epoch currently detected among nodes plus 1.
1030  *
1031  * This means that even if there are multiple nodes colliding, the node
1032  * with the greatest Node ID never moves forward, so eventually all the nodes
1033  * end with a different configuration epoch.
1034  */
clusterHandleConfigEpochCollision(clusterNode * sender)1035 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1036     /* Prerequisites: nodes have the same configEpoch and are both masters. */
1037     if (sender->configEpoch != myself->configEpoch ||
1038         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1039     /* Don't act if the colliding node has a smaller Node ID. */
1040     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1041     /* Get the next ID available at the best of this node knowledge. */
1042     server.cluster->currentEpoch++;
1043     myself->configEpoch = server.cluster->currentEpoch;
1044     clusterSaveConfigOrDie(1);
1045     serverLog(LL_VERBOSE,
1046         "WARNING: configEpoch collision with node %.40s."
1047         " configEpoch set to %llu",
1048         sender->name,
1049         (unsigned long long) myself->configEpoch);
1050 }
1051 
1052 /* -----------------------------------------------------------------------------
1053  * CLUSTER nodes blacklist
1054  *
1055  * The nodes blacklist is just a way to ensure that a given node with a given
1056  * Node ID is not readded before some time elapsed (this time is specified
1057  * in seconds in CLUSTER_BLACKLIST_TTL).
1058  *
1059  * This is useful when we want to remove a node from the cluster completely:
1060  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1061  * that even if we receive gossip messages from other nodes that still remember
1062  * about the node we want to remove, we don't re-add it before some time.
1063  *
1064  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1065  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1066  * in the cluster without dealing with the problem of other nodes re-adding
1067  * back the node to nodes we already sent the FORGET command to.
1068  *
1069  * The data structure used is a hash table with an sds string representing
1070  * the node ID as key, and the time when it is ok to re-add the node as
1071  * value.
1072  * -------------------------------------------------------------------------- */
1073 
1074 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
1075 
1076 
1077 /* Before of the addNode() or Exists() operations we always remove expired
1078  * entries from the black list. This is an O(N) operation but it is not a
1079  * problem since add / exists operations are called very infrequently and
1080  * the hash table is supposed to contain very little elements at max.
1081  * However without the cleanup during long uptimes and with some automated
1082  * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1083 void clusterBlacklistCleanup(void) {
1084     dictIterator *di;
1085     dictEntry *de;
1086 
1087     di = dictGetSafeIterator(server.cluster->nodes_black_list);
1088     while((de = dictNext(di)) != NULL) {
1089         int64_t expire = dictGetUnsignedIntegerVal(de);
1090 
1091         if (expire < server.unixtime)
1092             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1093     }
1094     dictReleaseIterator(di);
1095 }
1096 
1097 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1098 void clusterBlacklistAddNode(clusterNode *node) {
1099     dictEntry *de;
1100     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1101 
1102     clusterBlacklistCleanup();
1103     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1104         /* If the key was added, duplicate the sds string representation of
1105          * the key for the next lookup. We'll free it at the end. */
1106         id = sdsdup(id);
1107     }
1108     de = dictFind(server.cluster->nodes_black_list,id);
1109     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1110     sdsfree(id);
1111 }
1112 
1113 /* Return non-zero if the specified node ID exists in the blacklist.
1114  * You don't need to pass an sds string here, any pointer to 40 bytes
1115  * will work. */
clusterBlacklistExists(char * nodeid)1116 int clusterBlacklistExists(char *nodeid) {
1117     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1118     int retval;
1119 
1120     clusterBlacklistCleanup();
1121     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1122     sdsfree(id);
1123     return retval;
1124 }
1125 
1126 /* -----------------------------------------------------------------------------
1127  * CLUSTER messages exchange - PING/PONG and gossip
1128  * -------------------------------------------------------------------------- */
1129 
1130 /* This function checks if a given node should be marked as FAIL.
1131  * It happens if the following conditions are met:
1132  *
1133  * 1) We received enough failure reports from other master nodes via gossip.
1134  *    Enough means that the majority of the masters signaled the node is
1135  *    down recently.
1136  * 2) We believe this node is in PFAIL state.
1137  *
1138  * If a failure is detected we also inform the whole cluster about this
1139  * event trying to force every other node to set the FAIL flag for the node.
1140  *
1141  * Note that the form of agreement used here is weak, as we collect the majority
1142  * of masters state during some time, and even if we force agreement by
1143  * propagating the FAIL message, because of partitions we may not reach every
1144  * node. However:
1145  *
1146  * 1) Either we reach the majority and eventually the FAIL state will propagate
1147  *    to all the cluster.
1148  * 2) Or there is no majority so no slave promotion will be authorized and the
1149  *    FAIL flag will be cleared after some time.
1150  */
markNodeAsFailingIfNeeded(clusterNode * node)1151 void markNodeAsFailingIfNeeded(clusterNode *node) {
1152     int failures;
1153     int needed_quorum = (server.cluster->size / 2) + 1;
1154 
1155     if (!nodeTimedOut(node)) return; /* We can reach it. */
1156     if (nodeFailed(node)) return; /* Already FAILing. */
1157 
1158     failures = clusterNodeFailureReportsCount(node);
1159     /* Also count myself as a voter if I'm a master. */
1160     if (nodeIsMaster(myself)) failures++;
1161     if (failures < needed_quorum) return; /* No weak agreement from masters. */
1162 
1163     serverLog(LL_NOTICE,
1164         "Marking node %.40s as failing (quorum reached).", node->name);
1165 
1166     /* Mark the node as failing. */
1167     node->flags &= ~CLUSTER_NODE_PFAIL;
1168     node->flags |= CLUSTER_NODE_FAIL;
1169     node->fail_time = mstime();
1170 
1171     /* Broadcast the failing node name to everybody, forcing all the other
1172      * reachable nodes to flag the node as FAIL. */
1173     if (nodeIsMaster(myself)) clusterSendFail(node->name);
1174     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1175 }
1176 
1177 /* This function is called only if a node is marked as FAIL, but we are able
1178  * to reach it again. It checks if there are the conditions to undo the FAIL
1179  * state. */
clearNodeFailureIfNeeded(clusterNode * node)1180 void clearNodeFailureIfNeeded(clusterNode *node) {
1181     mstime_t now = mstime();
1182 
1183     serverAssert(nodeFailed(node));
1184 
1185     /* For slaves we always clear the FAIL flag if we can contact the
1186      * node again. */
1187     if (nodeIsSlave(node) || node->numslots == 0) {
1188         serverLog(LL_NOTICE,
1189             "Clear FAIL state for node %.40s: %s is reachable again.",
1190                 node->name,
1191                 nodeIsSlave(node) ? "slave" : "master without slots");
1192         node->flags &= ~CLUSTER_NODE_FAIL;
1193         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1194     }
1195 
1196     /* If it is a master and...
1197      * 1) The FAIL state is old enough.
1198      * 2) It is yet serving slots from our point of view (not failed over).
1199      * Apparently no one is going to fix these slots, clear the FAIL flag. */
1200     if (nodeIsMaster(node) && node->numslots > 0 &&
1201         (now - node->fail_time) >
1202         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1203     {
1204         serverLog(LL_NOTICE,
1205             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1206                 node->name);
1207         node->flags &= ~CLUSTER_NODE_FAIL;
1208         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1209     }
1210 }
1211 
1212 /* Return true if we already have a node in HANDSHAKE state matching the
1213  * specified ip address and port number. This function is used in order to
1214  * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port)1215 int clusterHandshakeInProgress(char *ip, int port) {
1216     dictIterator *di;
1217     dictEntry *de;
1218 
1219     di = dictGetSafeIterator(server.cluster->nodes);
1220     while((de = dictNext(di)) != NULL) {
1221         clusterNode *node = dictGetVal(de);
1222 
1223         if (!nodeInHandshake(node)) continue;
1224         if (!strcasecmp(node->ip,ip) && node->port == port) break;
1225     }
1226     dictReleaseIterator(di);
1227     return de != NULL;
1228 }
1229 
1230 /* Start an handshake with the specified address if there is not one
1231  * already in progress. Returns non-zero if the handshake was actually
1232  * started. On error zero is returned and errno is set to one of the
1233  * following values:
1234  *
1235  * EAGAIN - There is already an handshake in progress for this address.
1236  * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port)1237 int clusterStartHandshake(char *ip, int port) {
1238     clusterNode *n;
1239     char norm_ip[NET_IP_STR_LEN];
1240     struct sockaddr_storage sa;
1241 
1242     /* IP sanity check */
1243     if (inet_pton(AF_INET,ip,
1244             &(((struct sockaddr_in *)&sa)->sin_addr)))
1245     {
1246         sa.ss_family = AF_INET;
1247     } else if (inet_pton(AF_INET6,ip,
1248             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1249     {
1250         sa.ss_family = AF_INET6;
1251     } else {
1252         errno = EINVAL;
1253         return 0;
1254     }
1255 
1256     /* Port sanity check */
1257     if (port <= 0 || port > (65535-CLUSTER_PORT_INCR)) {
1258         errno = EINVAL;
1259         return 0;
1260     }
1261 
1262     /* Set norm_ip as the normalized string representation of the node
1263      * IP address. */
1264     memset(norm_ip,0,NET_IP_STR_LEN);
1265     if (sa.ss_family == AF_INET)
1266         inet_ntop(AF_INET,
1267             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1268             norm_ip,NET_IP_STR_LEN);
1269     else
1270         inet_ntop(AF_INET6,
1271             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1272             norm_ip,NET_IP_STR_LEN);
1273 
1274     if (clusterHandshakeInProgress(norm_ip,port)) {
1275         errno = EAGAIN;
1276         return 0;
1277     }
1278 
1279     /* Add the node with a random address (NULL as first argument to
1280      * createClusterNode()). Everything will be fixed during the
1281      * handshake. */
1282     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1283     memcpy(n->ip,norm_ip,sizeof(n->ip));
1284     n->port = port;
1285     clusterAddNode(n);
1286     return 1;
1287 }
1288 
1289 /* Process the gossip section of PING or PONG packets.
1290  * Note that this function assumes that the packet is already sanity-checked
1291  * by the caller, not in the content of the gossip section, but in the
1292  * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1293 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1294     uint16_t count = ntohs(hdr->count);
1295     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1296     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1297 
1298     while(count--) {
1299         uint16_t flags = ntohs(g->flags);
1300         clusterNode *node;
1301         sds ci;
1302 
1303         ci = representClusterNodeFlags(sdsempty(), flags);
1304         serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d %s",
1305             g->nodename,
1306             g->ip,
1307             ntohs(g->port),
1308             ci);
1309         sdsfree(ci);
1310 
1311         /* Update our state accordingly to the gossip sections */
1312         node = clusterLookupNode(g->nodename);
1313         if (node) {
1314             /* We already know this node.
1315                Handle failure reports, only when the sender is a master. */
1316             if (sender && nodeIsMaster(sender) && node != myself) {
1317                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1318                     if (clusterNodeAddFailureReport(node,sender)) {
1319                         serverLog(LL_VERBOSE,
1320                             "Node %.40s reported node %.40s as not reachable.",
1321                             sender->name, node->name);
1322                     }
1323                     markNodeAsFailingIfNeeded(node);
1324                 } else {
1325                     if (clusterNodeDelFailureReport(node,sender)) {
1326                         serverLog(LL_VERBOSE,
1327                             "Node %.40s reported node %.40s is back online.",
1328                             sender->name, node->name);
1329                     }
1330                 }
1331             }
1332 
1333             /* If we already know this node, but it is not reachable, and
1334              * we see a different address in the gossip section of a node that
1335              * can talk with this other node, update the address, disconnect
1336              * the old link if any, so that we'll attempt to connect with the
1337              * new address. */
1338             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1339                 !(flags & CLUSTER_NODE_NOADDR) &&
1340                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1341                 (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
1342             {
1343                 if (node->link) freeClusterLink(node->link);
1344                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1345                 node->port = ntohs(g->port);
1346                 node->flags &= ~CLUSTER_NODE_NOADDR;
1347             }
1348         } else {
1349             /* If it's not in NOADDR state and we don't have it, we
1350              * start a handshake process against this IP/PORT pairs.
1351              *
1352              * Note that we require that the sender of this gossip message
1353              * is a well known node in our cluster, otherwise we risk
1354              * joining another cluster. */
1355             if (sender &&
1356                 !(flags & CLUSTER_NODE_NOADDR) &&
1357                 !clusterBlacklistExists(g->nodename))
1358             {
1359                 clusterStartHandshake(g->ip,ntohs(g->port));
1360             }
1361         }
1362 
1363         /* Next node */
1364         g++;
1365     }
1366 }
1367 
1368 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes. */
nodeIp2String(char * buf,clusterLink * link)1369 void nodeIp2String(char *buf, clusterLink *link) {
1370     anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1371 }
1372 
1373 /* Update the node address to the IP address that can be extracted
1374  * from link->fd, and at the specified port.
1375  * Also disconnect the node link so that we'll connect again to the new
1376  * address.
1377  *
1378  * If the ip/port pair are already correct no operation is performed at
1379  * all.
1380  *
1381  * The function returns 0 if the node address is still the same,
1382  * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,int port)1383 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) {
1384     char ip[NET_IP_STR_LEN] = {0};
1385 
1386     /* We don't proceed if the link is the same as the sender link, as this
1387      * function is designed to see if the node link is consistent with the
1388      * symmetric link that is used to receive PINGs from the node.
1389      *
1390      * As a side effect this function never frees the passed 'link', so
1391      * it is safe to call during packet processing. */
1392     if (link == node->link) return 0;
1393 
1394     nodeIp2String(ip,link);
1395     if (node->port == port && strcmp(ip,node->ip) == 0) return 0;
1396 
1397     /* IP / port is different, update it. */
1398     memcpy(node->ip,ip,sizeof(ip));
1399     node->port = port;
1400     if (node->link) freeClusterLink(node->link);
1401     node->flags &= ~CLUSTER_NODE_NOADDR;
1402     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1403         node->name, node->ip, node->port);
1404 
1405     /* Check if this is our master and we have to change the
1406      * replication target as well. */
1407     if (nodeIsSlave(myself) && myself->slaveof == node)
1408         replicationSetMaster(node->ip, node->port);
1409     return 1;
1410 }
1411 
1412 /* Reconfigure the specified node 'n' as a master. This function is called when
1413  * a node that we believed to be a slave is now acting as master in order to
1414  * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1415 void clusterSetNodeAsMaster(clusterNode *n) {
1416     if (nodeIsMaster(n)) return;
1417 
1418     if (n->slaveof) {
1419         clusterNodeRemoveSlave(n->slaveof,n);
1420         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1421     }
1422     n->flags &= ~CLUSTER_NODE_SLAVE;
1423     n->flags |= CLUSTER_NODE_MASTER;
1424     n->slaveof = NULL;
1425 
1426     /* Update config and state. */
1427     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1428                          CLUSTER_TODO_UPDATE_STATE);
1429 }
1430 
1431 /* This function is called when we receive a master configuration via a
1432  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1433  * node, and the set of slots claimed under this configEpoch.
1434  *
1435  * What we do is to rebind the slots with newer configuration compared to our
1436  * local configuration, and if needed, we turn ourself into a replica of the
1437  * node (see the function comments for more info).
1438  *
1439  * The 'sender' is the node for which we received a configuration update.
1440  * Sometimes it is not actually the "Sender" of the information, like in the
1441  * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1442 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1443     int j;
1444     clusterNode *curmaster, *newmaster = NULL;
1445     /* The dirty slots list is a list of slots for which we lose the ownership
1446      * while having still keys inside. This usually happens after a failover
1447      * or after a manual cluster reconfiguration operated by the admin.
1448      *
1449      * If the update message is not able to demote a master to slave (in this
1450      * case we'll resync with the master updating the whole key space), we
1451      * need to delete all the keys in the slots we lost ownership. */
1452     uint16_t dirty_slots[CLUSTER_SLOTS];
1453     int dirty_slots_count = 0;
1454 
1455     /* Here we set curmaster to this node or the node this node
1456      * replicates to if it's a slave. In the for loop we are
1457      * interested to check if slots are taken away from curmaster. */
1458     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1459 
1460     if (sender == myself) {
1461         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1462         return;
1463     }
1464 
1465     for (j = 0; j < CLUSTER_SLOTS; j++) {
1466         if (bitmapTestBit(slots,j)) {
1467             /* The slot is already bound to the sender of this message. */
1468             if (server.cluster->slots[j] == sender) continue;
1469 
1470             /* The slot is in importing state, it should be modified only
1471              * manually via redis-trib (example: a resharding is in progress
1472              * and the migrating side slot was already closed and is advertising
1473              * a new config. We still want the slot to be closed manually). */
1474             if (server.cluster->importing_slots_from[j]) continue;
1475 
1476             /* We rebind the slot to the new node claiming it if:
1477              * 1) The slot was unassigned or the new node claims it with a
1478              *    greater configEpoch.
1479              * 2) We are not currently importing the slot. */
1480             if (server.cluster->slots[j] == NULL ||
1481                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1482             {
1483                 /* Was this slot mine, and still contains keys? Mark it as
1484                  * a dirty slot. */
1485                 if (server.cluster->slots[j] == myself &&
1486                     countKeysInSlot(j) &&
1487                     sender != myself)
1488                 {
1489                     dirty_slots[dirty_slots_count] = j;
1490                     dirty_slots_count++;
1491                 }
1492 
1493                 if (server.cluster->slots[j] == curmaster)
1494                     newmaster = sender;
1495                 clusterDelSlot(j);
1496                 clusterAddSlot(sender,j);
1497                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1498                                      CLUSTER_TODO_UPDATE_STATE|
1499                                      CLUSTER_TODO_FSYNC_CONFIG);
1500             }
1501         }
1502     }
1503 
1504     /* If at least one slot was reassigned from a node to another node
1505      * with a greater configEpoch, it is possible that:
1506      * 1) We are a master left without slots. This means that we were
1507      *    failed over and we should turn into a replica of the new
1508      *    master.
1509      * 2) We are a slave and our master is left without slots. We need
1510      *    to replicate to the new slots owner. */
1511     if (newmaster && curmaster->numslots == 0) {
1512         serverLog(LL_WARNING,
1513             "Configuration change detected. Reconfiguring myself "
1514             "as a replica of %.40s", sender->name);
1515         clusterSetMaster(sender);
1516         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1517                              CLUSTER_TODO_UPDATE_STATE|
1518                              CLUSTER_TODO_FSYNC_CONFIG);
1519     } else if (dirty_slots_count) {
1520         /* If we are here, we received an update message which removed
1521          * ownership for certain slots we still have keys about, but still
1522          * we are serving some slots, so this master node was not demoted to
1523          * a slave.
1524          *
1525          * In order to maintain a consistent state between keys and slots
1526          * we need to remove all the keys from the slots we lost. */
1527         for (j = 0; j < dirty_slots_count; j++)
1528             delKeysInSlot(dirty_slots[j]);
1529     }
1530 }
1531 
1532 /* When this function is called, there is a packet to process starting
1533  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1534  * function should just handle the higher level stuff of processing the
1535  * packet, modifying the cluster state if needed.
1536  *
1537  * The function returns 1 if the link is still valid after the packet
1538  * was processed, otherwise 0 if the link was freed since the packet
1539  * processing lead to some inconsistency error (for instance a PONG
1540  * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1541 int clusterProcessPacket(clusterLink *link) {
1542     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1543     uint32_t totlen = ntohl(hdr->totlen);
1544     uint16_t type = ntohs(hdr->type);
1545 
1546     server.cluster->stats_bus_messages_received++;
1547     serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1548         type, (unsigned long) totlen);
1549 
1550     /* Perform sanity checks */
1551     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1552     if (totlen > sdslen(link->rcvbuf)) return 1;
1553 
1554     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1555         /* Can't handle messages of different versions. */
1556         return 1;
1557     }
1558 
1559     uint16_t flags = ntohs(hdr->flags);
1560     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1561     clusterNode *sender;
1562 
1563     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1564         type == CLUSTERMSG_TYPE_MEET)
1565     {
1566         uint16_t count = ntohs(hdr->count);
1567         uint32_t explen; /* expected length of this packet */
1568 
1569         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1570         explen += (sizeof(clusterMsgDataGossip)*count);
1571         if (totlen != explen) return 1;
1572     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1573         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1574 
1575         explen += sizeof(clusterMsgDataFail);
1576         if (totlen != explen) return 1;
1577     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1578         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1579 
1580         explen += sizeof(clusterMsgDataPublish) -
1581                 8 +
1582                 ntohl(hdr->data.publish.msg.channel_len) +
1583                 ntohl(hdr->data.publish.msg.message_len);
1584         if (totlen != explen) return 1;
1585     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1586                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1587                type == CLUSTERMSG_TYPE_MFSTART)
1588     {
1589         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1590 
1591         if (totlen != explen) return 1;
1592     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1593         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1594 
1595         explen += sizeof(clusterMsgDataUpdate);
1596         if (totlen != explen) return 1;
1597     }
1598 
1599     /* Check if the sender is a known node. */
1600     sender = clusterLookupNode(hdr->sender);
1601     if (sender && !nodeInHandshake(sender)) {
1602         /* Update our curretEpoch if we see a newer epoch in the cluster. */
1603         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1604         senderConfigEpoch = ntohu64(hdr->configEpoch);
1605         if (senderCurrentEpoch > server.cluster->currentEpoch)
1606             server.cluster->currentEpoch = senderCurrentEpoch;
1607         /* Update the sender configEpoch if it is publishing a newer one. */
1608         if (senderConfigEpoch > sender->configEpoch) {
1609             sender->configEpoch = senderConfigEpoch;
1610             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1611                                  CLUSTER_TODO_FSYNC_CONFIG);
1612         }
1613         /* Update the replication offset info for this node. */
1614         sender->repl_offset = ntohu64(hdr->offset);
1615         sender->repl_offset_time = mstime();
1616         /* If we are a slave performing a manual failover and our master
1617          * sent its offset while already paused, populate the MF state. */
1618         if (server.cluster->mf_end &&
1619             nodeIsSlave(myself) &&
1620             myself->slaveof == sender &&
1621             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1622             server.cluster->mf_master_offset == 0)
1623         {
1624             server.cluster->mf_master_offset = sender->repl_offset;
1625             serverLog(LL_WARNING,
1626                 "Received replication offset for paused "
1627                 "master manual failover: %lld",
1628                 server.cluster->mf_master_offset);
1629         }
1630     }
1631 
1632     /* Initial processing of PING and MEET requests replying with a PONG. */
1633     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1634         serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1635 
1636         /* We use incoming MEET messages in order to set the address
1637          * for 'myself', since only other cluster nodes will send us
1638          * MEET messagses on handshakes, when the cluster joins, or
1639          * later if we changed address, and those nodes will use our
1640          * official address to connect to us. So by obtaining this address
1641          * from the socket is a simple way to discover / update our own
1642          * address in the cluster without it being hardcoded in the config.
1643          *
1644          * However if we don't have an address at all, we update the address
1645          * even with a normal PING packet. If it's wrong it will be fixed
1646          * by MEET later. */
1647         if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') {
1648             char ip[NET_IP_STR_LEN];
1649 
1650             if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1651                 strcmp(ip,myself->ip))
1652             {
1653                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1654                 serverLog(LL_WARNING,"IP address for this node updated to %s",
1655                     myself->ip);
1656                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1657             }
1658         }
1659 
1660         /* Add this node if it is new for us and the msg type is MEET.
1661          * In this stage we don't try to add the node with the right
1662          * flags, slaveof pointer, and so forth, as this details will be
1663          * resolved when we'll receive PONGs from the node. */
1664         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1665             clusterNode *node;
1666 
1667             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1668             nodeIp2String(node->ip,link);
1669             node->port = ntohs(hdr->port);
1670             clusterAddNode(node);
1671             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1672         }
1673 
1674         /* If this is a MEET packet from an unknown node, we still process
1675          * the gossip section here since we have to trust the sender because
1676          * of the message type. */
1677         if (!sender && type == CLUSTERMSG_TYPE_MEET)
1678             clusterProcessGossipSection(hdr,link);
1679 
1680         /* Anyway reply with a PONG */
1681         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1682     }
1683 
1684     /* PING, PONG, MEET: process config information. */
1685     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1686         type == CLUSTERMSG_TYPE_MEET)
1687     {
1688         serverLog(LL_DEBUG,"%s packet received: %p",
1689             type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1690             (void*)link->node);
1691         if (link->node) {
1692             if (nodeInHandshake(link->node)) {
1693                 /* If we already have this node, try to change the
1694                  * IP/port of the node with the new one. */
1695                 if (sender) {
1696                     serverLog(LL_VERBOSE,
1697                         "Handshake: we already know node %.40s, "
1698                         "updating the address if needed.", sender->name);
1699                     if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
1700                     {
1701                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1702                                              CLUSTER_TODO_UPDATE_STATE);
1703                     }
1704                     /* Free this node as we already have it. This will
1705                      * cause the link to be freed as well. */
1706                     clusterDelNode(link->node);
1707                     return 0;
1708                 }
1709 
1710                 /* First thing to do is replacing the random name with the
1711                  * right node name if this was a handshake stage. */
1712                 clusterRenameNode(link->node, hdr->sender);
1713                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1714                     link->node->name);
1715                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1716                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1717                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1718             } else if (memcmp(link->node->name,hdr->sender,
1719                         CLUSTER_NAMELEN) != 0)
1720             {
1721                 /* If the reply has a non matching node ID we
1722                  * disconnect this node and set it as not having an associated
1723                  * address. */
1724                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1725                     link->node->name,
1726                     (int)(mstime()-(link->node->ctime)),
1727                     link->node->flags);
1728                 link->node->flags |= CLUSTER_NODE_NOADDR;
1729                 link->node->ip[0] = '\0';
1730                 link->node->port = 0;
1731                 freeClusterLink(link);
1732                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1733                 return 0;
1734             }
1735         }
1736 
1737         /* Update the node address if it changed. */
1738         if (sender && type == CLUSTERMSG_TYPE_PING &&
1739             !nodeInHandshake(sender) &&
1740             nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
1741         {
1742             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1743                                  CLUSTER_TODO_UPDATE_STATE);
1744         }
1745 
1746         /* Update our info about the node */
1747         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1748             link->node->pong_received = mstime();
1749             link->node->ping_sent = 0;
1750 
1751             /* The PFAIL condition can be reversed without external
1752              * help if it is momentary (that is, if it does not
1753              * turn into a FAIL state).
1754              *
1755              * The FAIL condition is also reversible under specific
1756              * conditions detected by clearNodeFailureIfNeeded(). */
1757             if (nodeTimedOut(link->node)) {
1758                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1759                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1760                                      CLUSTER_TODO_UPDATE_STATE);
1761             } else if (nodeFailed(link->node)) {
1762                 clearNodeFailureIfNeeded(link->node);
1763             }
1764         }
1765 
1766         /* Check for role switch: slave -> master or master -> slave. */
1767         if (sender) {
1768             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1769                 sizeof(hdr->slaveof)))
1770             {
1771                 /* Node is a master. */
1772                 clusterSetNodeAsMaster(sender);
1773             } else {
1774                 /* Node is a slave. */
1775                 clusterNode *master = clusterLookupNode(hdr->slaveof);
1776 
1777                 if (nodeIsMaster(sender)) {
1778                     /* Master turned into a slave! Reconfigure the node. */
1779                     clusterDelNodeSlots(sender);
1780                     sender->flags &= ~(CLUSTER_NODE_MASTER|
1781                                        CLUSTER_NODE_MIGRATE_TO);
1782                     sender->flags |= CLUSTER_NODE_SLAVE;
1783 
1784                     /* Update config and state. */
1785                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1786                                          CLUSTER_TODO_UPDATE_STATE);
1787                 }
1788 
1789                 /* Master node changed for this slave? */
1790                 if (master && sender->slaveof != master) {
1791                     if (sender->slaveof)
1792                         clusterNodeRemoveSlave(sender->slaveof,sender);
1793                     clusterNodeAddSlave(master,sender);
1794                     sender->slaveof = master;
1795 
1796                     /* Update config. */
1797                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1798                 }
1799             }
1800         }
1801 
1802         /* Update our info about served slots.
1803          *
1804          * Note: this MUST happen after we update the master/slave state
1805          * so that CLUSTER_NODE_MASTER flag will be set. */
1806 
1807         /* Many checks are only needed if the set of served slots this
1808          * instance claims is different compared to the set of slots we have
1809          * for it. Check this ASAP to avoid other computational expansive
1810          * checks later. */
1811         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1812         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1813 
1814         if (sender) {
1815             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1816             if (sender_master) {
1817                 dirty_slots = memcmp(sender_master->slots,
1818                         hdr->myslots,sizeof(hdr->myslots)) != 0;
1819             }
1820         }
1821 
1822         /* 1) If the sender of the message is a master, and we detected that
1823          *    the set of slots it claims changed, scan the slots to see if we
1824          *    need to update our configuration. */
1825         if (sender && nodeIsMaster(sender) && dirty_slots)
1826             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1827 
1828         /* 2) We also check for the reverse condition, that is, the sender
1829          *    claims to serve slots we know are served by a master with a
1830          *    greater configEpoch. If this happens we inform the sender.
1831          *
1832          * This is useful because sometimes after a partition heals, a
1833          * reappearing master may be the last one to claim a given set of
1834          * hash slots, but with a configuration that other instances know to
1835          * be deprecated. Example:
1836          *
1837          * A and B are master and slave for slots 1,2,3.
1838          * A is partitioned away, B gets promoted.
1839          * B is partitioned away, and A returns available.
1840          *
1841          * Usually B would PING A publishing its set of served slots and its
1842          * configEpoch, but because of the partition B can't inform A of the
1843          * new configuration, so other nodes that have an updated table must
1844          * do it. In this way A will stop to act as a master (or can try to
1845          * failover if there are the conditions to win the election). */
1846         if (sender && dirty_slots) {
1847             int j;
1848 
1849             for (j = 0; j < CLUSTER_SLOTS; j++) {
1850                 if (bitmapTestBit(hdr->myslots,j)) {
1851                     if (server.cluster->slots[j] == sender ||
1852                         server.cluster->slots[j] == NULL) continue;
1853                     if (server.cluster->slots[j]->configEpoch >
1854                         senderConfigEpoch)
1855                     {
1856                         serverLog(LL_VERBOSE,
1857                             "Node %.40s has old slots configuration, sending "
1858                             "an UPDATE message about %.40s",
1859                                 sender->name, server.cluster->slots[j]->name);
1860                         clusterSendUpdate(sender->link,
1861                             server.cluster->slots[j]);
1862 
1863                         /* TODO: instead of exiting the loop send every other
1864                          * UPDATE packet for other nodes that are the new owner
1865                          * of sender's slots. */
1866                         break;
1867                     }
1868                 }
1869             }
1870         }
1871 
1872         /* If our config epoch collides with the sender's try to fix
1873          * the problem. */
1874         if (sender &&
1875             nodeIsMaster(myself) && nodeIsMaster(sender) &&
1876             senderConfigEpoch == myself->configEpoch)
1877         {
1878             clusterHandleConfigEpochCollision(sender);
1879         }
1880 
1881         /* Get info from the gossip section */
1882         if (sender) clusterProcessGossipSection(hdr,link);
1883     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1884         clusterNode *failing;
1885 
1886         if (sender) {
1887             failing = clusterLookupNode(hdr->data.fail.about.nodename);
1888             if (failing &&
1889                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
1890             {
1891                 serverLog(LL_NOTICE,
1892                     "FAIL message received from %.40s about %.40s",
1893                     hdr->sender, hdr->data.fail.about.nodename);
1894                 failing->flags |= CLUSTER_NODE_FAIL;
1895                 failing->fail_time = mstime();
1896                 failing->flags &= ~CLUSTER_NODE_PFAIL;
1897                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1898                                      CLUSTER_TODO_UPDATE_STATE);
1899             }
1900         } else {
1901             serverLog(LL_NOTICE,
1902                 "Ignoring FAIL message from unknown node %.40s about %.40s",
1903                 hdr->sender, hdr->data.fail.about.nodename);
1904         }
1905     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1906         robj *channel, *message;
1907         uint32_t channel_len, message_len;
1908 
1909         /* Don't bother creating useless objects if there are no
1910          * Pub/Sub subscribers. */
1911         if (dictSize(server.pubsub_channels) ||
1912            listLength(server.pubsub_patterns))
1913         {
1914             channel_len = ntohl(hdr->data.publish.msg.channel_len);
1915             message_len = ntohl(hdr->data.publish.msg.message_len);
1916             channel = createStringObject(
1917                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
1918             message = createStringObject(
1919                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
1920                         message_len);
1921             pubsubPublishMessage(channel,message);
1922             decrRefCount(channel);
1923             decrRefCount(message);
1924         }
1925     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
1926         if (!sender) return 1;  /* We don't know that node. */
1927         clusterSendFailoverAuthIfNeeded(sender,hdr);
1928     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
1929         if (!sender) return 1;  /* We don't know that node. */
1930         /* We consider this vote only if the sender is a master serving
1931          * a non zero number of slots, and its currentEpoch is greater or
1932          * equal to epoch where this node started the election. */
1933         if (nodeIsMaster(sender) && sender->numslots > 0 &&
1934             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
1935         {
1936             server.cluster->failover_auth_count++;
1937             /* Maybe we reached a quorum here, set a flag to make sure
1938              * we check ASAP. */
1939             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
1940         }
1941     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
1942         /* This message is acceptable only if I'm a master and the sender
1943          * is one of my slaves. */
1944         if (!sender || sender->slaveof != myself) return 1;
1945         /* Manual failover requested from slaves. Initialize the state
1946          * accordingly. */
1947         resetManualFailover();
1948         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
1949         server.cluster->mf_slave = sender;
1950         pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
1951         serverLog(LL_WARNING,"Manual failover requested by slave %.40s.",
1952             sender->name);
1953     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1954         clusterNode *n; /* The node the update is about. */
1955         uint64_t reportedConfigEpoch =
1956                     ntohu64(hdr->data.update.nodecfg.configEpoch);
1957 
1958         if (!sender) return 1;  /* We don't know the sender. */
1959         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
1960         if (!n) return 1;   /* We don't know the reported node. */
1961         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
1962 
1963         /* If in our current config the node is a slave, set it as a master. */
1964         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
1965 
1966         /* Update the node's configEpoch. */
1967         n->configEpoch = reportedConfigEpoch;
1968         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1969                              CLUSTER_TODO_FSYNC_CONFIG);
1970 
1971         /* Check the bitmap of served slots and update our
1972          * config accordingly. */
1973         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
1974             hdr->data.update.nodecfg.slots);
1975     } else {
1976         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
1977     }
1978     return 1;
1979 }
1980 
1981 /* This function is called when we detect the link with this node is lost.
1982    We set the node as no longer connected. The Cluster Cron will detect
1983    this connection and will try to get it connected again.
1984 
1985    Instead if the node is a temporary node used to accept a query, we
1986    completely free the node on error. */
handleLinkIOError(clusterLink * link)1987 void handleLinkIOError(clusterLink *link) {
1988     freeClusterLink(link);
1989 }
1990 
1991 /* Send data. This is handled using a trivial send buffer that gets
1992  * consumed by write(). We don't try to optimize this for speed too much
1993  * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)1994 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1995     clusterLink *link = (clusterLink*) privdata;
1996     ssize_t nwritten;
1997     UNUSED(el);
1998     UNUSED(mask);
1999 
2000     nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2001     if (nwritten <= 0) {
2002         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2003             strerror(errno));
2004         handleLinkIOError(link);
2005         return;
2006     }
2007     sdsrange(link->sndbuf,nwritten,-1);
2008     if (sdslen(link->sndbuf) == 0)
2009         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2010 }
2011 
2012 /* Read data. Try to read the first field of the header first to check the
2013  * full length of the packet. When a whole packet is in memory this function
2014  * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2015 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2016     char buf[sizeof(clusterMsg)];
2017     ssize_t nread;
2018     clusterMsg *hdr;
2019     clusterLink *link = (clusterLink*) privdata;
2020     unsigned int readlen, rcvbuflen;
2021     UNUSED(el);
2022     UNUSED(mask);
2023 
2024     while(1) { /* Read as long as there is data to read. */
2025         rcvbuflen = sdslen(link->rcvbuf);
2026         if (rcvbuflen < 8) {
2027             /* First, obtain the first 8 bytes to get the full message
2028              * length. */
2029             readlen = 8 - rcvbuflen;
2030         } else {
2031             /* Finally read the full message. */
2032             hdr = (clusterMsg*) link->rcvbuf;
2033             if (rcvbuflen == 8) {
2034                 /* Perform some sanity check on the message signature
2035                  * and length. */
2036                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2037                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2038                 {
2039                     serverLog(LL_WARNING,
2040                         "Bad message length or signature received "
2041                         "from Cluster bus.");
2042                     handleLinkIOError(link);
2043                     return;
2044                 }
2045             }
2046             readlen = ntohl(hdr->totlen) - rcvbuflen;
2047             if (readlen > sizeof(buf)) readlen = sizeof(buf);
2048         }
2049 
2050         nread = read(fd,buf,readlen);
2051         if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2052 
2053         if (nread <= 0) {
2054             /* I/O error... */
2055             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2056                 (nread == 0) ? "connection closed" : strerror(errno));
2057             handleLinkIOError(link);
2058             return;
2059         } else {
2060             /* Read data and recast the pointer to the new buffer. */
2061             link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2062             hdr = (clusterMsg*) link->rcvbuf;
2063             rcvbuflen += nread;
2064         }
2065 
2066         /* Total length obtained? Process this packet. */
2067         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2068             if (clusterProcessPacket(link)) {
2069                 sdsfree(link->rcvbuf);
2070                 link->rcvbuf = sdsempty();
2071             } else {
2072                 return; /* Link no longer valid. */
2073             }
2074         }
2075     }
2076 }
2077 
2078 /* Put stuff into the send buffer.
2079  *
2080  * It is guaranteed that this function will never have as a side effect
2081  * the link to be invalidated, so it is safe to call this function
2082  * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2083 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2084     if (sdslen(link->sndbuf) == 0 && msglen != 0)
2085         aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
2086                     clusterWriteHandler,link);
2087 
2088     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2089     server.cluster->stats_bus_messages_sent++;
2090 }
2091 
2092 /* Send a message to all the nodes that are part of the cluster having
2093  * a connected link.
2094  *
2095  * It is guaranteed that this function will never have as a side effect
2096  * some node->link to be invalidated, so it is safe to call this function
2097  * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2098 void clusterBroadcastMessage(void *buf, size_t len) {
2099     dictIterator *di;
2100     dictEntry *de;
2101 
2102     di = dictGetSafeIterator(server.cluster->nodes);
2103     while((de = dictNext(di)) != NULL) {
2104         clusterNode *node = dictGetVal(de);
2105 
2106         if (!node->link) continue;
2107         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2108             continue;
2109         clusterSendMessage(node->link,buf,len);
2110     }
2111     dictReleaseIterator(di);
2112 }
2113 
2114 /* Build the message header. hdr must point to a buffer at least
2115  * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2116 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2117     int totlen = 0;
2118     uint64_t offset;
2119     clusterNode *master;
2120 
2121     /* If this node is a master, we send its slots bitmap and configEpoch.
2122      * If this node is a slave we send the master's information instead (the
2123      * node is flagged as slave so the receiver knows that it is NOT really
2124      * in charge for this slots. */
2125     master = (nodeIsSlave(myself) && myself->slaveof) ?
2126               myself->slaveof : myself;
2127 
2128     memset(hdr,0,sizeof(*hdr));
2129     hdr->ver = htons(CLUSTER_PROTO_VER);
2130     hdr->sig[0] = 'R';
2131     hdr->sig[1] = 'C';
2132     hdr->sig[2] = 'm';
2133     hdr->sig[3] = 'b';
2134     hdr->type = htons(type);
2135     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2136 
2137     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2138     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2139     if (myself->slaveof != NULL)
2140         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2141     hdr->port = htons(server.port);
2142     hdr->flags = htons(myself->flags);
2143     hdr->state = server.cluster->state;
2144 
2145     /* Set the currentEpoch and configEpochs. */
2146     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2147     hdr->configEpoch = htonu64(master->configEpoch);
2148 
2149     /* Set the replication offset. */
2150     if (nodeIsSlave(myself))
2151         offset = replicationGetSlaveOffset();
2152     else
2153         offset = server.master_repl_offset;
2154     hdr->offset = htonu64(offset);
2155 
2156     /* Set the message flags. */
2157     if (nodeIsMaster(myself) && server.cluster->mf_end)
2158         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2159 
2160     /* Compute the message length for certain messages. For other messages
2161      * this is up to the caller. */
2162     if (type == CLUSTERMSG_TYPE_FAIL) {
2163         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2164         totlen += sizeof(clusterMsgDataFail);
2165     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2166         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2167         totlen += sizeof(clusterMsgDataUpdate);
2168     }
2169     hdr->totlen = htonl(totlen);
2170     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2171 }
2172 
2173 /* Send a PING or PONG packet to the specified node, making sure to add enough
2174  * gossip informations. */
clusterSendPing(clusterLink * link,int type)2175 void clusterSendPing(clusterLink *link, int type) {
2176     unsigned char *buf;
2177     clusterMsg *hdr;
2178     int gossipcount = 0; /* Number of gossip sections added so far. */
2179     int wanted; /* Number of gossip sections we want to append if possible. */
2180     int totlen; /* Total packet length. */
2181     /* freshnodes is the max number of nodes we can hope to append at all:
2182      * nodes available minus two (ourself and the node we are sending the
2183      * message to). However practically there may be less valid nodes since
2184      * nodes in handshake state, disconnected, are not considered. */
2185     int freshnodes = dictSize(server.cluster->nodes)-2;
2186 
2187     /* How many gossip sections we want to add? 1/10 of the number of nodes
2188      * and anyway at least 3. Why 1/10?
2189      *
2190      * If we have N masters, with N/10 entries, and we consider that in
2191      * node_timeout we exchange with each other node at least 4 packets
2192      * (we ping in the worst case in node_timeout/2 time, and we also
2193      * receive two pings from the host), we have a total of 8 packets
2194      * in the node_timeout*2 falure reports validity time. So we have
2195      * that, for a single PFAIL node, we can expect to receive the following
2196      * number of failure reports (in the specified window of time):
2197      *
2198      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2199      *
2200      * PROB = probability of being featured in a single gossip entry,
2201      *        which is 1 / NUM_OF_NODES.
2202      * ENTRIES = 10.
2203      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2204      *
2205      * If we assume we have just masters (so num of nodes and num of masters
2206      * is the same), with 1/10 we always get over the majority, and specifically
2207      * 80% of the number of nodes, to account for many masters failing at the
2208      * same time.
2209      *
2210      * Since we have non-voting slaves that lower the probability of an entry
2211      * to feature our node, we set the number of entires per packet as
2212      * 10% of the total nodes we have. */
2213     wanted = floor(dictSize(server.cluster->nodes)/10);
2214     if (wanted < 3) wanted = 3;
2215     if (wanted > freshnodes) wanted = freshnodes;
2216 
2217     /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2218      * later according to the number of gossip sections we really were able
2219      * to put inside the packet. */
2220     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2221     totlen += (sizeof(clusterMsgDataGossip)*wanted);
2222     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2223      * sizeof(clusterMsg) or more. */
2224     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2225     buf = zcalloc(totlen);
2226     hdr = (clusterMsg*) buf;
2227 
2228     /* Populate the header. */
2229     if (link->node && type == CLUSTERMSG_TYPE_PING)
2230         link->node->ping_sent = mstime();
2231     clusterBuildMessageHdr(hdr,type);
2232 
2233     /* Populate the gossip fields */
2234     int maxiterations = wanted*3;
2235     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2236         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2237         clusterNode *this = dictGetVal(de);
2238         clusterMsgDataGossip *gossip;
2239         int j;
2240 
2241         /* Don't include this node: the whole packet header is about us
2242          * already, so we just gossip about other nodes. */
2243         if (this == myself) continue;
2244 
2245         /* Give a bias to FAIL/PFAIL nodes. */
2246         if (maxiterations > wanted*2 &&
2247             !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
2248             continue;
2249 
2250         /* In the gossip section don't include:
2251          * 1) Nodes in HANDSHAKE state.
2252          * 3) Nodes with the NOADDR flag set.
2253          * 4) Disconnected nodes if they don't have configured slots.
2254          */
2255         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2256             (this->link == NULL && this->numslots == 0))
2257         {
2258             freshnodes--; /* Tecnically not correct, but saves CPU. */
2259             continue;
2260         }
2261 
2262         /* Check if we already added this node */
2263         for (j = 0; j < gossipcount; j++) {
2264             if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
2265                     CLUSTER_NAMELEN) == 0) break;
2266         }
2267         if (j != gossipcount) continue;
2268 
2269         /* Add it */
2270         freshnodes--;
2271         gossip = &(hdr->data.ping.gossip[gossipcount]);
2272         memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
2273         gossip->ping_sent = htonl(this->ping_sent);
2274         gossip->pong_received = htonl(this->pong_received);
2275         memcpy(gossip->ip,this->ip,sizeof(this->ip));
2276         gossip->port = htons(this->port);
2277         gossip->flags = htons(this->flags);
2278         gossip->notused1 = 0;
2279         gossip->notused2 = 0;
2280         gossipcount++;
2281     }
2282 
2283     /* Ready to send... fix the totlen fiend and queue the message in the
2284      * output buffer. */
2285     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2286     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2287     hdr->count = htons(gossipcount);
2288     hdr->totlen = htonl(totlen);
2289     clusterSendMessage(link,buf,totlen);
2290     zfree(buf);
2291 }
2292 
2293 /* Send a PONG packet to every connected node that's not in handshake state
2294  * and for which we have a valid link.
2295  *
2296  * In Redis Cluster pongs are not used just for failure detection, but also
2297  * to carry important configuration information. So broadcasting a pong is
2298  * useful when something changes in the configuration and we want to make
2299  * the cluster aware ASAP (for instance after a slave promotion).
2300  *
2301  * The 'target' argument specifies the receiving instances using the
2302  * defines below:
2303  *
2304  * CLUSTER_BROADCAST_ALL -> All known instances.
2305  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2306  */
2307 #define CLUSTER_BROADCAST_ALL 0
2308 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2309 void clusterBroadcastPong(int target) {
2310     dictIterator *di;
2311     dictEntry *de;
2312 
2313     di = dictGetSafeIterator(server.cluster->nodes);
2314     while((de = dictNext(di)) != NULL) {
2315         clusterNode *node = dictGetVal(de);
2316 
2317         if (!node->link) continue;
2318         if (node == myself || nodeInHandshake(node)) continue;
2319         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2320             int local_slave =
2321                 nodeIsSlave(node) && node->slaveof &&
2322                 (node->slaveof == myself || node->slaveof == myself->slaveof);
2323             if (!local_slave) continue;
2324         }
2325         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2326     }
2327     dictReleaseIterator(di);
2328 }
2329 
2330 /* Send a PUBLISH message.
2331  *
2332  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2333 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2334     unsigned char buf[sizeof(clusterMsg)], *payload;
2335     clusterMsg *hdr = (clusterMsg*) buf;
2336     uint32_t totlen;
2337     uint32_t channel_len, message_len;
2338 
2339     channel = getDecodedObject(channel);
2340     message = getDecodedObject(message);
2341     channel_len = sdslen(channel->ptr);
2342     message_len = sdslen(message->ptr);
2343 
2344     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2345     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2346     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2347 
2348     hdr->data.publish.msg.channel_len = htonl(channel_len);
2349     hdr->data.publish.msg.message_len = htonl(message_len);
2350     hdr->totlen = htonl(totlen);
2351 
2352     /* Try to use the local buffer if possible */
2353     if (totlen < sizeof(buf)) {
2354         payload = buf;
2355     } else {
2356         payload = zmalloc(totlen);
2357         memcpy(payload,hdr,sizeof(*hdr));
2358         hdr = (clusterMsg*) payload;
2359     }
2360     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2361     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2362         message->ptr,sdslen(message->ptr));
2363 
2364     if (link)
2365         clusterSendMessage(link,payload,totlen);
2366     else
2367         clusterBroadcastMessage(payload,totlen);
2368 
2369     decrRefCount(channel);
2370     decrRefCount(message);
2371     if (payload != buf) zfree(payload);
2372 }
2373 
2374 /* Send a FAIL message to all the nodes we are able to contact.
2375  * The FAIL message is sent when we detect that a node is failing
2376  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2377  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2378  * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2379 void clusterSendFail(char *nodename) {
2380     unsigned char buf[sizeof(clusterMsg)];
2381     clusterMsg *hdr = (clusterMsg*) buf;
2382 
2383     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2384     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2385     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2386 }
2387 
2388 /* Send an UPDATE message to the specified link carrying the specified 'node'
2389  * slots configuration. The node name, slots bitmap, and configEpoch info
2390  * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2391 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2392     unsigned char buf[sizeof(clusterMsg)];
2393     clusterMsg *hdr = (clusterMsg*) buf;
2394 
2395     if (link == NULL) return;
2396     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2397     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2398     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2399     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2400     clusterSendMessage(link,buf,ntohl(hdr->totlen));
2401 }
2402 
2403 /* -----------------------------------------------------------------------------
2404  * CLUSTER Pub/Sub support
2405  *
2406  * For now we do very little, just propagating PUBLISH messages across the whole
2407  * cluster. In the future we'll try to get smarter and avoiding propagating those
2408  * messages to hosts without receives for a given channel.
2409  * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2410 void clusterPropagatePublish(robj *channel, robj *message) {
2411     clusterSendPublish(NULL, channel, message);
2412 }
2413 
2414 /* -----------------------------------------------------------------------------
2415  * SLAVE node specific functions
2416  * -------------------------------------------------------------------------- */
2417 
2418 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2419  * see if there is the quorum for this slave instance to failover its failing
2420  * master.
2421  *
2422  * Note that we send the failover request to everybody, master and slave nodes,
2423  * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2424 void clusterRequestFailoverAuth(void) {
2425     unsigned char buf[sizeof(clusterMsg)];
2426     clusterMsg *hdr = (clusterMsg*) buf;
2427     uint32_t totlen;
2428 
2429     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2430     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2431      * in the header to communicate the nodes receiving the message that
2432      * they should authorized the failover even if the master is working. */
2433     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2434     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2435     hdr->totlen = htonl(totlen);
2436     clusterBroadcastMessage(buf,totlen);
2437 }
2438 
2439 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2440 void clusterSendFailoverAuth(clusterNode *node) {
2441     unsigned char buf[sizeof(clusterMsg)];
2442     clusterMsg *hdr = (clusterMsg*) buf;
2443     uint32_t totlen;
2444 
2445     if (!node->link) return;
2446     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2447     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2448     hdr->totlen = htonl(totlen);
2449     clusterSendMessage(node->link,buf,totlen);
2450 }
2451 
2452 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2453 void clusterSendMFStart(clusterNode *node) {
2454     unsigned char buf[sizeof(clusterMsg)];
2455     clusterMsg *hdr = (clusterMsg*) buf;
2456     uint32_t totlen;
2457 
2458     if (!node->link) return;
2459     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2460     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2461     hdr->totlen = htonl(totlen);
2462     clusterSendMessage(node->link,buf,totlen);
2463 }
2464 
2465 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2466 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2467     clusterNode *master = node->slaveof;
2468     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2469     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2470     unsigned char *claimed_slots = request->myslots;
2471     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2472     int j;
2473 
2474     /* IF we are not a master serving at least 1 slot, we don't have the
2475      * right to vote, as the cluster size in Redis Cluster is the number
2476      * of masters serving at least one slot, and quorum is the cluster
2477      * size + 1 */
2478     if (nodeIsSlave(myself) || myself->numslots == 0) return;
2479 
2480     /* Request epoch must be >= our currentEpoch.
2481      * Note that it is impossible for it to actually be greater since
2482      * our currentEpoch was updated as a side effect of receiving this
2483      * request, if the request epoch was greater. */
2484     if (requestCurrentEpoch < server.cluster->currentEpoch) {
2485         serverLog(LL_WARNING,
2486             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2487             node->name,
2488             (unsigned long long) requestCurrentEpoch,
2489             (unsigned long long) server.cluster->currentEpoch);
2490         return;
2491     }
2492 
2493     /* I already voted for this epoch? Return ASAP. */
2494     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2495         serverLog(LL_WARNING,
2496                 "Failover auth denied to %.40s: already voted for epoch %llu",
2497                 node->name,
2498                 (unsigned long long) server.cluster->currentEpoch);
2499         return;
2500     }
2501 
2502     /* Node must be a slave and its master down.
2503      * The master can be non failing if the request is flagged
2504      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2505     if (nodeIsMaster(node) || master == NULL ||
2506         (!nodeFailed(master) && !force_ack))
2507     {
2508         if (nodeIsMaster(node)) {
2509             serverLog(LL_WARNING,
2510                     "Failover auth denied to %.40s: it is a master node",
2511                     node->name);
2512         } else if (master == NULL) {
2513             serverLog(LL_WARNING,
2514                     "Failover auth denied to %.40s: I don't know its master",
2515                     node->name);
2516         } else if (!nodeFailed(master)) {
2517             serverLog(LL_WARNING,
2518                     "Failover auth denied to %.40s: its master is up",
2519                     node->name);
2520         }
2521         return;
2522     }
2523 
2524     /* We did not voted for a slave about this master for two
2525      * times the node timeout. This is not strictly needed for correctness
2526      * of the algorithm but makes the base case more linear. */
2527     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2528     {
2529         serverLog(LL_WARNING,
2530                 "Failover auth denied to %.40s: "
2531                 "can't vote about this master before %lld milliseconds",
2532                 node->name,
2533                 (long long) ((server.cluster_node_timeout*2)-
2534                              (mstime() - node->slaveof->voted_time)));
2535         return;
2536     }
2537 
2538     /* The slave requesting the vote must have a configEpoch for the claimed
2539      * slots that is >= the one of the masters currently serving the same
2540      * slots in the current configuration. */
2541     for (j = 0; j < CLUSTER_SLOTS; j++) {
2542         if (bitmapTestBit(claimed_slots, j) == 0) continue;
2543         if (server.cluster->slots[j] == NULL ||
2544             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2545         {
2546             continue;
2547         }
2548         /* If we reached this point we found a slot that in our current slots
2549          * is served by a master with a greater configEpoch than the one claimed
2550          * by the slave requesting our vote. Refuse to vote for this slave. */
2551         serverLog(LL_WARNING,
2552                 "Failover auth denied to %.40s: "
2553                 "slot %d epoch (%llu) > reqEpoch (%llu)",
2554                 node->name, j,
2555                 (unsigned long long) server.cluster->slots[j]->configEpoch,
2556                 (unsigned long long) requestConfigEpoch);
2557         return;
2558     }
2559 
2560     /* We can vote for this slave. */
2561     clusterSendFailoverAuth(node);
2562     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2563     node->slaveof->voted_time = mstime();
2564     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2565         node->name, (unsigned long long) server.cluster->currentEpoch);
2566 }
2567 
2568 /* This function returns the "rank" of this instance, a slave, in the context
2569  * of its master-slaves ring. The rank of the slave is given by the number of
2570  * other slaves for the same master that have a better replication offset
2571  * compared to the local one (better means, greater, so they claim more data).
2572  *
2573  * A slave with rank 0 is the one with the greatest (most up to date)
2574  * replication offset, and so forth. Note that because how the rank is computed
2575  * multiple slaves may have the same rank, in case they have the same offset.
2576  *
2577  * The slave rank is used to add a delay to start an election in order to
2578  * get voted and replace a failing master. Slaves with better replication
2579  * offsets are more likely to win. */
clusterGetSlaveRank(void)2580 int clusterGetSlaveRank(void) {
2581     long long myoffset;
2582     int j, rank = 0;
2583     clusterNode *master;
2584 
2585     serverAssert(nodeIsSlave(myself));
2586     master = myself->slaveof;
2587     if (master == NULL) return 0; /* Never called by slaves without master. */
2588 
2589     myoffset = replicationGetSlaveOffset();
2590     for (j = 0; j < master->numslaves; j++)
2591         if (master->slaves[j] != myself &&
2592             master->slaves[j]->repl_offset > myoffset) rank++;
2593     return rank;
2594 }
2595 
2596 /* This function is called by clusterHandleSlaveFailover() in order to
2597  * let the slave log why it is not able to failover. Sometimes there are
2598  * not the conditions, but since the failover function is called again and
2599  * again, we can't log the same things continuously.
2600  *
2601  * This function works by logging only if a given set of conditions are
2602  * true:
2603  *
2604  * 1) The reason for which the failover can't be initiated changed.
2605  *    The reasons also include a NONE reason we reset the state to
2606  *    when the slave finds that its master is fine (no FAIL flag).
2607  * 2) Also, the log is emitted again if the master is still down and
2608  *    the reason for not failing over is still the same, but more than
2609  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2610  * 3) Finally, the function only logs if the slave is down for more than
2611  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
2612  *    failover starts in a reasonable time.
2613  *
2614  * The function is called with the reason why the slave can't failover
2615  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2616  *
2617  * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2618 void clusterLogCantFailover(int reason) {
2619     char *msg;
2620     static time_t lastlog_time = 0;
2621     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2622 
2623     /* Don't log if we have the same reason for some time. */
2624     if (reason == server.cluster->cant_failover_reason &&
2625         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2626         return;
2627 
2628     server.cluster->cant_failover_reason = reason;
2629 
2630     /* We also don't emit any log if the master failed no long ago, the
2631      * goal of this function is to log slaves in a stalled condition for
2632      * a long time. */
2633     if (myself->slaveof &&
2634         nodeFailed(myself->slaveof) &&
2635         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2636 
2637     switch(reason) {
2638     case CLUSTER_CANT_FAILOVER_DATA_AGE:
2639         msg = "Disconnected from master for longer than allowed. "
2640               "Please check the 'cluster-slave-validity-factor' configuration "
2641               "option.";
2642         break;
2643     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2644         msg = "Waiting the delay before I can start a new failover.";
2645         break;
2646     case CLUSTER_CANT_FAILOVER_EXPIRED:
2647         msg = "Failover attempt expired.";
2648         break;
2649     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2650         msg = "Waiting for votes, but majority still not reached.";
2651         break;
2652     default:
2653         msg = "Unknown reason code.";
2654         break;
2655     }
2656     lastlog_time = time(NULL);
2657     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2658 }
2659 
2660 /* This function implements the final part of automatic and manual failovers,
2661  * where the slave grabs its master's hash slots, and propagates the new
2662  * configuration.
2663  *
2664  * Note that it's up to the caller to be sure that the node got a new
2665  * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2666 void clusterFailoverReplaceYourMaster(void) {
2667     int j;
2668     clusterNode *oldmaster = myself->slaveof;
2669 
2670     if (nodeIsMaster(myself) || oldmaster == NULL) return;
2671 
2672     /* 1) Turn this node into a master. */
2673     clusterSetNodeAsMaster(myself);
2674     replicationUnsetMaster();
2675 
2676     /* 2) Claim all the slots assigned to our master. */
2677     for (j = 0; j < CLUSTER_SLOTS; j++) {
2678         if (clusterNodeGetSlotBit(oldmaster,j)) {
2679             clusterDelSlot(j);
2680             clusterAddSlot(myself,j);
2681         }
2682     }
2683 
2684     /* 3) Update state and save config. */
2685     clusterUpdateState();
2686     clusterSaveConfigOrDie(1);
2687 
2688     /* 4) Pong all the other nodes so that they can update the state
2689      *    accordingly and detect that we switched to master role. */
2690     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2691 
2692     /* 5) If there was a manual failover in progress, clear the state. */
2693     resetManualFailover();
2694 }
2695 
2696 /* This function is called if we are a slave node and our master serving
2697  * a non-zero amount of hash slots is in FAIL state.
2698  *
2699  * The gaol of this function is:
2700  * 1) To check if we are able to perform a failover, is our data updated?
2701  * 2) Try to get elected by masters.
2702  * 3) Perform the failover informing all the other nodes.
2703  */
clusterHandleSlaveFailover(void)2704 void clusterHandleSlaveFailover(void) {
2705     mstime_t data_age;
2706     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2707     int needed_quorum = (server.cluster->size / 2) + 1;
2708     int manual_failover = server.cluster->mf_end != 0 &&
2709                           server.cluster->mf_can_start;
2710     mstime_t auth_timeout, auth_retry_time;
2711 
2712     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2713 
2714     /* Compute the failover timeout (the max time we have to send votes
2715      * and wait for replies), and the failover retry time (the time to wait
2716      * before trying to get voted again).
2717      *
2718      * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
2719      * Retry is two times the Timeout.
2720      */
2721     auth_timeout = server.cluster_node_timeout*2;
2722     if (auth_timeout < 2000) auth_timeout = 2000;
2723     auth_retry_time = auth_timeout*2;
2724 
2725     /* Pre conditions to run the function, that must be met both in case
2726      * of an automatic or manual failover:
2727      * 1) We are a slave.
2728      * 2) Our master is flagged as FAIL, or this is a manual failover.
2729      * 3) It is serving slots. */
2730     if (nodeIsMaster(myself) ||
2731         myself->slaveof == NULL ||
2732         (!nodeFailed(myself->slaveof) && !manual_failover) ||
2733         myself->slaveof->numslots == 0)
2734     {
2735         /* There are no reasons to failover, so we set the reason why we
2736          * are returning without failing over to NONE. */
2737         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2738         return;
2739     }
2740 
2741     /* Set data_age to the number of seconds we are disconnected from
2742      * the master. */
2743     if (server.repl_state == REPL_STATE_CONNECTED) {
2744         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2745                    * 1000;
2746     } else {
2747         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2748     }
2749 
2750     /* Remove the node timeout from the data age as it is fine that we are
2751      * disconnected from our master at least for the time it was down to be
2752      * flagged as FAIL, that's the baseline. */
2753     if (data_age > server.cluster_node_timeout)
2754         data_age -= server.cluster_node_timeout;
2755 
2756     /* Check if our data is recent enough according to the slave validity
2757      * factor configured by the user.
2758      *
2759      * Check bypassed for manual failovers. */
2760     if (server.cluster_slave_validity_factor &&
2761         data_age >
2762         (((mstime_t)server.repl_ping_slave_period * 1000) +
2763          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
2764     {
2765         if (!manual_failover) {
2766             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
2767             return;
2768         }
2769     }
2770 
2771     /* If the previous failover attempt timedout and the retry time has
2772      * elapsed, we can setup a new one. */
2773     if (auth_age > auth_retry_time) {
2774         server.cluster->failover_auth_time = mstime() +
2775             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
2776             random() % 500; /* Random delay between 0 and 500 milliseconds. */
2777         server.cluster->failover_auth_count = 0;
2778         server.cluster->failover_auth_sent = 0;
2779         server.cluster->failover_auth_rank = clusterGetSlaveRank();
2780         /* We add another delay that is proportional to the slave rank.
2781          * Specifically 1 second * rank. This way slaves that have a probably
2782          * less updated replication offset, are penalized. */
2783         server.cluster->failover_auth_time +=
2784             server.cluster->failover_auth_rank * 1000;
2785         /* However if this is a manual failover, no delay is needed. */
2786         if (server.cluster->mf_end) {
2787             server.cluster->failover_auth_time = mstime();
2788             server.cluster->failover_auth_rank = 0;
2789         }
2790         serverLog(LL_WARNING,
2791             "Start of election delayed for %lld milliseconds "
2792             "(rank #%d, offset %lld).",
2793             server.cluster->failover_auth_time - mstime(),
2794             server.cluster->failover_auth_rank,
2795             replicationGetSlaveOffset());
2796         /* Now that we have a scheduled election, broadcast our offset
2797          * to all the other slaves so that they'll updated their offsets
2798          * if our offset is better. */
2799         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
2800         return;
2801     }
2802 
2803     /* It is possible that we received more updated offsets from other
2804      * slaves for the same master since we computed our election delay.
2805      * Update the delay if our rank changed.
2806      *
2807      * Not performed if this is a manual failover. */
2808     if (server.cluster->failover_auth_sent == 0 &&
2809         server.cluster->mf_end == 0)
2810     {
2811         int newrank = clusterGetSlaveRank();
2812         if (newrank > server.cluster->failover_auth_rank) {
2813             long long added_delay =
2814                 (newrank - server.cluster->failover_auth_rank) * 1000;
2815             server.cluster->failover_auth_time += added_delay;
2816             server.cluster->failover_auth_rank = newrank;
2817             serverLog(LL_WARNING,
2818                 "Slave rank updated to #%d, added %lld milliseconds of delay.",
2819                 newrank, added_delay);
2820         }
2821     }
2822 
2823     /* Return ASAP if we can't still start the election. */
2824     if (mstime() < server.cluster->failover_auth_time) {
2825         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
2826         return;
2827     }
2828 
2829     /* Return ASAP if the election is too old to be valid. */
2830     if (auth_age > auth_timeout) {
2831         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
2832         return;
2833     }
2834 
2835     /* Ask for votes if needed. */
2836     if (server.cluster->failover_auth_sent == 0) {
2837         server.cluster->currentEpoch++;
2838         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
2839         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
2840             (unsigned long long) server.cluster->currentEpoch);
2841         clusterRequestFailoverAuth();
2842         server.cluster->failover_auth_sent = 1;
2843         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2844                              CLUSTER_TODO_UPDATE_STATE|
2845                              CLUSTER_TODO_FSYNC_CONFIG);
2846         return; /* Wait for replies. */
2847     }
2848 
2849     /* Check if we reached the quorum. */
2850     if (server.cluster->failover_auth_count >= needed_quorum) {
2851         /* We have the quorum, we can finally failover the master. */
2852 
2853         serverLog(LL_WARNING,
2854             "Failover election won: I'm the new master.");
2855 
2856         /* Update my configEpoch to the epoch of the election. */
2857         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
2858             myself->configEpoch = server.cluster->failover_auth_epoch;
2859             serverLog(LL_WARNING,
2860                 "configEpoch set to %llu after successful failover",
2861                 (unsigned long long) myself->configEpoch);
2862         }
2863 
2864         /* Take responsability for the cluster slots. */
2865         clusterFailoverReplaceYourMaster();
2866     } else {
2867         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
2868     }
2869 }
2870 
2871 /* -----------------------------------------------------------------------------
2872  * CLUSTER slave migration
2873  *
2874  * Slave migration is the process that allows a slave of a master that is
2875  * already covered by at least another slave, to "migrate" to a master that
2876  * is orpaned, that is, left with no working slaves.
2877  * ------------------------------------------------------------------------- */
2878 
2879 /* This function is responsible to decide if this replica should be migrated
2880  * to a different (orphaned) master. It is called by the clusterCron() function
2881  * only if:
2882  *
2883  * 1) We are a slave node.
2884  * 2) It was detected that there is at least one orphaned master in
2885  *    the cluster.
2886  * 3) We are a slave of one of the masters with the greatest number of
2887  *    slaves.
2888  *
2889  * This checks are performed by the caller since it requires to iterate
2890  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
2891  * if definitely needed.
2892  *
2893  * The fuction is called with a pre-computed max_slaves, that is the max
2894  * number of working (not in FAIL state) slaves for a single master.
2895  *
2896  * Additional conditions for migration are examined inside the function.
2897  */
clusterHandleSlaveMigration(int max_slaves)2898 void clusterHandleSlaveMigration(int max_slaves) {
2899     int j, okslaves = 0;
2900     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
2901     dictIterator *di;
2902     dictEntry *de;
2903 
2904     /* Step 1: Don't migrate if the cluster state is not ok. */
2905     if (server.cluster->state != CLUSTER_OK) return;
2906 
2907     /* Step 2: Don't migrate if my master will not be left with at least
2908      *         'migration-barrier' slaves after my migration. */
2909     if (mymaster == NULL) return;
2910     for (j = 0; j < mymaster->numslaves; j++)
2911         if (!nodeFailed(mymaster->slaves[j]) &&
2912             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
2913     if (okslaves <= server.cluster_migration_barrier) return;
2914 
2915     /* Step 3: Idenitfy a candidate for migration, and check if among the
2916      * masters with the greatest number of ok slaves, I'm the one with the
2917      * smallest node ID (the "candidate slave").
2918      *
2919      * Note: this means that eventually a replica migration will occurr
2920      * since slaves that are reachable again always have their FAIL flag
2921      * cleared, so eventually there must be a candidate. At the same time
2922      * this does not mean that there are no race conditions possible (two
2923      * slaves migrating at the same time), but this is unlikely to
2924      * happen, and harmless when happens. */
2925     candidate = myself;
2926     di = dictGetSafeIterator(server.cluster->nodes);
2927     while((de = dictNext(di)) != NULL) {
2928         clusterNode *node = dictGetVal(de);
2929         int okslaves = 0, is_orphaned = 1;
2930 
2931         /* We want to migrate only if this master is working, orphaned, and
2932          * used to have slaves or if failed over a master that had slaves
2933          * (MIGRATE_TO flag). This way we only migrate to instances that were
2934          * supposed to have replicas. */
2935         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
2936         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
2937 
2938         /* Check number of working slaves. */
2939         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
2940         if (okslaves > 0) is_orphaned = 0;
2941 
2942         if (is_orphaned) {
2943             if (!target && node->numslots > 0) target = node;
2944 
2945             /* Track the starting time of the orphaned condition for this
2946              * master. */
2947             if (!node->orphaned_time) node->orphaned_time = mstime();
2948         } else {
2949             node->orphaned_time = 0;
2950         }
2951 
2952         /* Check if I'm the slave candidate for the migration: attached
2953          * to a master with the maximum number of slaves and with the smallest
2954          * node ID. */
2955         if (okslaves == max_slaves) {
2956             for (j = 0; j < node->numslaves; j++) {
2957                 if (memcmp(node->slaves[j]->name,
2958                            candidate->name,
2959                            CLUSTER_NAMELEN) < 0)
2960                 {
2961                     candidate = node->slaves[j];
2962                 }
2963             }
2964         }
2965     }
2966     dictReleaseIterator(di);
2967 
2968     /* Step 4: perform the migration if there is a target, and if I'm the
2969      * candidate, but only if the master is continuously orphaned for a
2970      * couple of seconds, so that during failovers, we give some time to
2971      * the natural slaves of this instance to advertise their switch from
2972      * the old master to the new one. */
2973     if (target && candidate == myself &&
2974         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY)
2975     {
2976         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
2977             target->name);
2978         clusterSetMaster(target);
2979     }
2980 }
2981 
2982 /* -----------------------------------------------------------------------------
2983  * CLUSTER manual failover
2984  *
2985  * This are the important steps performed by slaves during a manual failover:
2986  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
2987  *    setting mf_end to the millisecond unix time at which we'll abort the
2988  *    attempt.
2989  * 2) Slave sends a MFSTART message to the master requesting to pause clients
2990  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
2991  *    When master is paused for manual failover, it also starts to flag
2992  *    packets with CLUSTERMSG_FLAG0_PAUSED.
2993  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
2994  * 4) If slave received the offset from the master, and its offset matches,
2995  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
2996  *    the failover as usually, with the difference that the vote request
2997  *    will be modified to force masters to vote for a slave that has a
2998  *    working master.
2999  *
3000  * From the point of view of the master things are simpler: when a
3001  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3002  * the sender in mf_slave. During the time limit for the manual failover
3003  * the master will just send PINGs more often to this slave, flagged with
3004  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3005  * a packet from the master with this flag set.
3006  *
3007  * The gaol of the manual failover is to perform a fast failover without
3008  * data loss due to the asynchronous master-slave replication.
3009  * -------------------------------------------------------------------------- */
3010 
3011 /* Reset the manual failover state. This works for both masters and slavesa
3012  * as all the state about manual failover is cleared.
3013  *
3014  * The function can be used both to initialize the manual failover state at
3015  * startup or to abort a manual failover in progress. */
resetManualFailover(void)3016 void resetManualFailover(void) {
3017     if (server.cluster->mf_end && clientsArePaused()) {
3018         server.clients_pause_end_time = 0;
3019         clientsArePaused(); /* Just use the side effect of the function. */
3020     }
3021     server.cluster->mf_end = 0; /* No manual failover in progress. */
3022     server.cluster->mf_can_start = 0;
3023     server.cluster->mf_slave = NULL;
3024     server.cluster->mf_master_offset = 0;
3025 }
3026 
3027 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3028 void manualFailoverCheckTimeout(void) {
3029     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3030         serverLog(LL_WARNING,"Manual failover timed out.");
3031         resetManualFailover();
3032     }
3033 }
3034 
3035 /* This function is called from the cluster cron function in order to go
3036  * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3037 void clusterHandleManualFailover(void) {
3038     /* Return ASAP if no manual failover is in progress. */
3039     if (server.cluster->mf_end == 0) return;
3040 
3041     /* If mf_can_start is non-zero, the failover was already triggered so the
3042      * next steps are performed by clusterHandleSlaveFailover(). */
3043     if (server.cluster->mf_can_start) return;
3044 
3045     if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3046 
3047     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3048         /* Our replication offset matches the master replication offset
3049          * announced after clients were paused. We can start the failover. */
3050         server.cluster->mf_can_start = 1;
3051         serverLog(LL_WARNING,
3052             "All master replication stream processed, "
3053             "manual failover can start.");
3054     }
3055 }
3056 
3057 /* -----------------------------------------------------------------------------
3058  * CLUSTER cron job
3059  * -------------------------------------------------------------------------- */
3060 
3061 /* This is executed 10 times every second */
clusterCron(void)3062 void clusterCron(void) {
3063     dictIterator *di;
3064     dictEntry *de;
3065     int update_state = 0;
3066     int orphaned_masters; /* How many masters there are without ok slaves. */
3067     int max_slaves; /* Max number of ok slaves for a single master. */
3068     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3069     mstime_t min_pong = 0, now = mstime();
3070     clusterNode *min_pong_node = NULL;
3071     static unsigned long long iteration = 0;
3072     mstime_t handshake_timeout;
3073 
3074     iteration++; /* Number of times this function was called so far. */
3075 
3076     /* The handshake timeout is the time after which a handshake node that was
3077      * not turned into a normal node is removed from the nodes. Usually it is
3078      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3079      * the value of 1 second. */
3080     handshake_timeout = server.cluster_node_timeout;
3081     if (handshake_timeout < 1000) handshake_timeout = 1000;
3082 
3083     /* Check if we have disconnected nodes and re-establish the connection. */
3084     di = dictGetSafeIterator(server.cluster->nodes);
3085     while((de = dictNext(di)) != NULL) {
3086         clusterNode *node = dictGetVal(de);
3087 
3088         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3089 
3090         /* A Node in HANDSHAKE state has a limited lifespan equal to the
3091          * configured node timeout. */
3092         if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3093             clusterDelNode(node);
3094             continue;
3095         }
3096 
3097         if (node->link == NULL) {
3098             int fd;
3099             mstime_t old_ping_sent;
3100             clusterLink *link;
3101 
3102             fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3103                 node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
3104             if (fd == -1) {
3105                 /* We got a synchronous error from connect before
3106                  * clusterSendPing() had a chance to be called.
3107                  * If node->ping_sent is zero, failure detection can't work,
3108                  * so we claim we actually sent a ping now (that will
3109                  * be really sent as soon as the link is obtained). */
3110                 if (node->ping_sent == 0) node->ping_sent = mstime();
3111                 serverLog(LL_DEBUG, "Unable to connect to "
3112                     "Cluster Node [%s]:%d -> %s", node->ip,
3113                     node->port+CLUSTER_PORT_INCR,
3114                     server.neterr);
3115                 continue;
3116             }
3117             link = createClusterLink(node);
3118             link->fd = fd;
3119             node->link = link;
3120             aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3121                     clusterReadHandler,link);
3122             /* Queue a PING in the new connection ASAP: this is crucial
3123              * to avoid false positives in failure detection.
3124              *
3125              * If the node is flagged as MEET, we send a MEET message instead
3126              * of a PING one, to force the receiver to add us in its node
3127              * table. */
3128             old_ping_sent = node->ping_sent;
3129             clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3130                     CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3131             if (old_ping_sent) {
3132                 /* If there was an active ping before the link was
3133                  * disconnected, we want to restore the ping time, otherwise
3134                  * replaced by the clusterSendPing() call. */
3135                 node->ping_sent = old_ping_sent;
3136             }
3137             /* We can clear the flag after the first packet is sent.
3138              * If we'll never receive a PONG, we'll never send new packets
3139              * to this node. Instead after the PONG is received and we
3140              * are no longer in meet/handshake status, we want to send
3141              * normal PING packets. */
3142             node->flags &= ~CLUSTER_NODE_MEET;
3143 
3144             serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3145                     node->name, node->ip, node->port+CLUSTER_PORT_INCR);
3146         }
3147     }
3148     dictReleaseIterator(di);
3149 
3150     /* Ping some random node 1 time every 10 iterations, so that we usually ping
3151      * one random node every second. */
3152     if (!(iteration % 10)) {
3153         int j;
3154 
3155         /* Check a few random nodes and ping the one with the oldest
3156          * pong_received time. */
3157         for (j = 0; j < 5; j++) {
3158             de = dictGetRandomKey(server.cluster->nodes);
3159             clusterNode *this = dictGetVal(de);
3160 
3161             /* Don't ping nodes disconnected or with a ping currently active. */
3162             if (this->link == NULL || this->ping_sent != 0) continue;
3163             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3164                 continue;
3165             if (min_pong_node == NULL || min_pong > this->pong_received) {
3166                 min_pong_node = this;
3167                 min_pong = this->pong_received;
3168             }
3169         }
3170         if (min_pong_node) {
3171             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3172             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3173         }
3174     }
3175 
3176     /* Iterate nodes to check if we need to flag something as failing.
3177      * This loop is also responsible to:
3178      * 1) Check if there are orphaned masters (masters without non failing
3179      *    slaves).
3180      * 2) Count the max number of non failing slaves for a single master.
3181      * 3) Count the number of slaves for our master, if we are a slave. */
3182     orphaned_masters = 0;
3183     max_slaves = 0;
3184     this_slaves = 0;
3185     di = dictGetSafeIterator(server.cluster->nodes);
3186     while((de = dictNext(di)) != NULL) {
3187         clusterNode *node = dictGetVal(de);
3188         now = mstime(); /* Use an updated time at every iteration. */
3189         mstime_t delay;
3190 
3191         if (node->flags &
3192             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3193                 continue;
3194 
3195         /* Orphaned master check, useful only if the current instance
3196          * is a slave that may migrate to another master. */
3197         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3198             int okslaves = clusterCountNonFailingSlaves(node);
3199 
3200             /* A master is orphaned if it is serving a non-zero number of
3201              * slots, have no working slaves, but used to have at least one
3202              * slave, or failed over a master that used to have slaves. */
3203             if (okslaves == 0 && node->numslots > 0 &&
3204                 node->flags & CLUSTER_NODE_MIGRATE_TO)
3205             {
3206                 orphaned_masters++;
3207             }
3208             if (okslaves > max_slaves) max_slaves = okslaves;
3209             if (nodeIsSlave(myself) && myself->slaveof == node)
3210                 this_slaves = okslaves;
3211         }
3212 
3213         /* If we are waiting for the PONG more than half the cluster
3214          * timeout, reconnect the link: maybe there is a connection
3215          * issue even if the node is alive. */
3216         if (node->link && /* is connected */
3217             now - node->link->ctime >
3218             server.cluster_node_timeout && /* was not already reconnected */
3219             node->ping_sent && /* we already sent a ping */
3220             node->pong_received < node->ping_sent && /* still waiting pong */
3221             /* and we are waiting for the pong more than timeout/2 */
3222             now - node->ping_sent > server.cluster_node_timeout/2)
3223         {
3224             /* Disconnect the link, it will be reconnected automatically. */
3225             freeClusterLink(node->link);
3226         }
3227 
3228         /* If we have currently no active ping in this instance, and the
3229          * received PONG is older than half the cluster timeout, send
3230          * a new ping now, to ensure all the nodes are pinged without
3231          * a too big delay. */
3232         if (node->link &&
3233             node->ping_sent == 0 &&
3234             (now - node->pong_received) > server.cluster_node_timeout/2)
3235         {
3236             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3237             continue;
3238         }
3239 
3240         /* If we are a master and one of the slaves requested a manual
3241          * failover, ping it continuously. */
3242         if (server.cluster->mf_end &&
3243             nodeIsMaster(myself) &&
3244             server.cluster->mf_slave == node &&
3245             node->link)
3246         {
3247             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3248             continue;
3249         }
3250 
3251         /* Check only if we have an active ping for this instance. */
3252         if (node->ping_sent == 0) continue;
3253 
3254         /* Compute the delay of the PONG. Note that if we already received
3255          * the PONG, then node->ping_sent is zero, so can't reach this
3256          * code at all. */
3257         delay = now - node->ping_sent;
3258 
3259         if (delay > server.cluster_node_timeout) {
3260             /* Timeout reached. Set the node as possibly failing if it is
3261              * not already in this state. */
3262             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3263                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3264                     node->name);
3265                 node->flags |= CLUSTER_NODE_PFAIL;
3266                 update_state = 1;
3267             }
3268         }
3269     }
3270     dictReleaseIterator(di);
3271 
3272     /* If we are a slave node but the replication is still turned off,
3273      * enable it if we know the address of our master and it appears to
3274      * be up. */
3275     if (nodeIsSlave(myself) &&
3276         server.masterhost == NULL &&
3277         myself->slaveof &&
3278         nodeHasAddr(myself->slaveof))
3279     {
3280         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3281     }
3282 
3283     /* Abourt a manual failover if the timeout is reached. */
3284     manualFailoverCheckTimeout();
3285 
3286     if (nodeIsSlave(myself)) {
3287         clusterHandleManualFailover();
3288         clusterHandleSlaveFailover();
3289         /* If there are orphaned slaves, and we are a slave among the masters
3290          * with the max number of non-failing slaves, consider migrating to
3291          * the orphaned masters. Note that it does not make sense to try
3292          * a migration if there is no master with at least *two* working
3293          * slaves. */
3294         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3295             clusterHandleSlaveMigration(max_slaves);
3296     }
3297 
3298     if (update_state || server.cluster->state == CLUSTER_FAIL)
3299         clusterUpdateState();
3300 }
3301 
3302 /* This function is called before the event handler returns to sleep for
3303  * events. It is useful to perform operations that must be done ASAP in
3304  * reaction to events fired but that are not safe to perform inside event
3305  * handlers, or to perform potentially expansive tasks that we need to do
3306  * a single time before replying to clients. */
clusterBeforeSleep(void)3307 void clusterBeforeSleep(void) {
3308     /* Handle failover, this is needed when it is likely that there is already
3309      * the quorum from masters in order to react fast. */
3310     if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3311         clusterHandleSlaveFailover();
3312 
3313     /* Update the cluster state. */
3314     if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3315         clusterUpdateState();
3316 
3317     /* Save the config, possibly using fsync. */
3318     if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3319         int fsync = server.cluster->todo_before_sleep &
3320                     CLUSTER_TODO_FSYNC_CONFIG;
3321         clusterSaveConfigOrDie(fsync);
3322     }
3323 
3324     /* Reset our flags (not strictly needed since every single function
3325      * called for flags set should be able to clear its flag). */
3326     server.cluster->todo_before_sleep = 0;
3327 }
3328 
clusterDoBeforeSleep(int flags)3329 void clusterDoBeforeSleep(int flags) {
3330     server.cluster->todo_before_sleep |= flags;
3331 }
3332 
3333 /* -----------------------------------------------------------------------------
3334  * Slots management
3335  * -------------------------------------------------------------------------- */
3336 
3337 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3338  * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3339 int bitmapTestBit(unsigned char *bitmap, int pos) {
3340     off_t byte = pos/8;
3341     int bit = pos&7;
3342     return (bitmap[byte] & (1<<bit)) != 0;
3343 }
3344 
3345 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3346 void bitmapSetBit(unsigned char *bitmap, int pos) {
3347     off_t byte = pos/8;
3348     int bit = pos&7;
3349     bitmap[byte] |= 1<<bit;
3350 }
3351 
3352 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3353 void bitmapClearBit(unsigned char *bitmap, int pos) {
3354     off_t byte = pos/8;
3355     int bit = pos&7;
3356     bitmap[byte] &= ~(1<<bit);
3357 }
3358 
3359 /* Return non-zero if there is at least one master with slaves in the cluster.
3360  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3361  * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3362 int clusterMastersHaveSlaves(void) {
3363     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3364     dictEntry *de;
3365     int slaves = 0;
3366     while((de = dictNext(di)) != NULL) {
3367         clusterNode *node = dictGetVal(de);
3368 
3369         if (nodeIsSlave(node)) continue;
3370         slaves += node->numslaves;
3371     }
3372     dictReleaseIterator(di);
3373     return slaves != 0;
3374 }
3375 
3376 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3377 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3378     int old = bitmapTestBit(n->slots,slot);
3379     bitmapSetBit(n->slots,slot);
3380     if (!old) {
3381         n->numslots++;
3382         /* When a master gets its first slot, even if it has no slaves,
3383          * it gets flagged with MIGRATE_TO, that is, the master is a valid
3384          * target for replicas migration, if and only if at least one of
3385          * the other masters has slaves right now.
3386          *
3387          * Normally masters are valid targerts of replica migration if:
3388          * 1. The used to have slaves (but no longer have).
3389          * 2. They are slaves failing over a master that used to have slaves.
3390          *
3391          * However new masters with slots assigned are considered valid
3392          * migration tagets if the rest of the cluster is not a slave-less.
3393          *
3394          * See https://github.com/antirez/redis/issues/3043 for more info. */
3395         if (n->numslots == 1 && clusterMastersHaveSlaves())
3396             n->flags |= CLUSTER_NODE_MIGRATE_TO;
3397     }
3398     return old;
3399 }
3400 
3401 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3402 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3403     int old = bitmapTestBit(n->slots,slot);
3404     bitmapClearBit(n->slots,slot);
3405     if (old) n->numslots--;
3406     return old;
3407 }
3408 
3409 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3410 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3411     return bitmapTestBit(n->slots,slot);
3412 }
3413 
3414 /* Add the specified slot to the list of slots that node 'n' will
3415  * serve. Return C_OK if the operation ended with success.
3416  * If the slot is already assigned to another instance this is considered
3417  * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3418 int clusterAddSlot(clusterNode *n, int slot) {
3419     if (server.cluster->slots[slot]) return C_ERR;
3420     clusterNodeSetSlotBit(n,slot);
3421     server.cluster->slots[slot] = n;
3422     return C_OK;
3423 }
3424 
3425 /* Delete the specified slot marking it as unassigned.
3426  * Returns C_OK if the slot was assigned, otherwise if the slot was
3427  * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3428 int clusterDelSlot(int slot) {
3429     clusterNode *n = server.cluster->slots[slot];
3430 
3431     if (!n) return C_ERR;
3432     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3433     server.cluster->slots[slot] = NULL;
3434     return C_OK;
3435 }
3436 
3437 /* Delete all the slots associated with the specified node.
3438  * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3439 int clusterDelNodeSlots(clusterNode *node) {
3440     int deleted = 0, j;
3441 
3442     for (j = 0; j < CLUSTER_SLOTS; j++) {
3443         if (clusterNodeGetSlotBit(node,j)) clusterDelSlot(j);
3444         deleted++;
3445     }
3446     return deleted;
3447 }
3448 
3449 /* Clear the migrating / importing state for all the slots.
3450  * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3451 void clusterCloseAllSlots(void) {
3452     memset(server.cluster->migrating_slots_to,0,
3453         sizeof(server.cluster->migrating_slots_to));
3454     memset(server.cluster->importing_slots_from,0,
3455         sizeof(server.cluster->importing_slots_from));
3456 }
3457 
3458 /* -----------------------------------------------------------------------------
3459  * Cluster state evaluation function
3460  * -------------------------------------------------------------------------- */
3461 
3462 /* The following are defines that are only used in the evaluation function
3463  * and are based on heuristics. Actaully the main point about the rejoin and
3464  * writable delay is that they should be a few orders of magnitude larger
3465  * than the network latency. */
3466 #define CLUSTER_MAX_REJOIN_DELAY 5000
3467 #define CLUSTER_MIN_REJOIN_DELAY 500
3468 #define CLUSTER_WRITABLE_DELAY 2000
3469 
clusterUpdateState(void)3470 void clusterUpdateState(void) {
3471     int j, new_state;
3472     int reachable_masters = 0;
3473     static mstime_t among_minority_time;
3474     static mstime_t first_call_time = 0;
3475 
3476     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3477 
3478     /* If this is a master node, wait some time before turning the state
3479      * into OK, since it is not a good idea to rejoin the cluster as a writable
3480      * master, after a reboot, without giving the cluster a chance to
3481      * reconfigure this node. Note that the delay is calculated starting from
3482      * the first call to this function and not since the server start, in order
3483      * to don't count the DB loading time. */
3484     if (first_call_time == 0) first_call_time = mstime();
3485     if (nodeIsMaster(myself) &&
3486         server.cluster->state == CLUSTER_FAIL &&
3487         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3488 
3489     /* Start assuming the state is OK. We'll turn it into FAIL if there
3490      * are the right conditions. */
3491     new_state = CLUSTER_OK;
3492 
3493     /* Check if all the slots are covered. */
3494     if (server.cluster_require_full_coverage) {
3495         for (j = 0; j < CLUSTER_SLOTS; j++) {
3496             if (server.cluster->slots[j] == NULL ||
3497                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3498             {
3499                 new_state = CLUSTER_FAIL;
3500                 break;
3501             }
3502         }
3503     }
3504 
3505     /* Compute the cluster size, that is the number of master nodes
3506      * serving at least a single slot.
3507      *
3508      * At the same time count the number of reachable masters having
3509      * at least one slot. */
3510     {
3511         dictIterator *di;
3512         dictEntry *de;
3513 
3514         server.cluster->size = 0;
3515         di = dictGetSafeIterator(server.cluster->nodes);
3516         while((de = dictNext(di)) != NULL) {
3517             clusterNode *node = dictGetVal(de);
3518 
3519             if (nodeIsMaster(node) && node->numslots) {
3520                 server.cluster->size++;
3521                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3522                     reachable_masters++;
3523             }
3524         }
3525         dictReleaseIterator(di);
3526     }
3527 
3528     /* If we are in a minority partition, change the cluster state
3529      * to FAIL. */
3530     {
3531         int needed_quorum = (server.cluster->size / 2) + 1;
3532 
3533         if (reachable_masters < needed_quorum) {
3534             new_state = CLUSTER_FAIL;
3535             among_minority_time = mstime();
3536         }
3537     }
3538 
3539     /* Log a state change */
3540     if (new_state != server.cluster->state) {
3541         mstime_t rejoin_delay = server.cluster_node_timeout;
3542 
3543         /* If the instance is a master and was partitioned away with the
3544          * minority, don't let it accept queries for some time after the
3545          * partition heals, to make sure there is enough time to receive
3546          * a configuration update. */
3547         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3548             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3549         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3550             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3551 
3552         if (new_state == CLUSTER_OK &&
3553             nodeIsMaster(myself) &&
3554             mstime() - among_minority_time < rejoin_delay)
3555         {
3556             return;
3557         }
3558 
3559         /* Change the state and log the event. */
3560         serverLog(LL_WARNING,"Cluster state changed: %s",
3561             new_state == CLUSTER_OK ? "ok" : "fail");
3562         server.cluster->state = new_state;
3563     }
3564 }
3565 
3566 /* This function is called after the node startup in order to verify that data
3567  * loaded from disk is in agreement with the cluster configuration:
3568  *
3569  * 1) If we find keys about hash slots we have no responsibility for, the
3570  *    following happens:
3571  *    A) If no other node is in charge according to the current cluster
3572  *       configuration, we add these slots to our node.
3573  *    B) If according to our config other nodes are already in charge for
3574  *       this lots, we set the slots as IMPORTING from our point of view
3575  *       in order to justify we have those slots, and in order to make
3576  *       redis-trib aware of the issue, so that it can try to fix it.
3577  * 2) If we find data in a DB different than DB0 we return C_ERR to
3578  *    signal the caller it should quit the server with an error message
3579  *    or take other actions.
3580  *
3581  * The function always returns C_OK even if it will try to correct
3582  * the error described in "1". However if data is found in DB different
3583  * from DB0, C_ERR is returned.
3584  *
3585  * The function also uses the logging facility in order to warn the user
3586  * about desynchronizations between the data we have in memory and the
3587  * cluster configuration. */
verifyClusterConfigWithData(void)3588 int verifyClusterConfigWithData(void) {
3589     int j;
3590     int update_config = 0;
3591 
3592     /* If this node is a slave, don't perform the check at all as we
3593      * completely depend on the replication stream. */
3594     if (nodeIsSlave(myself)) return C_OK;
3595 
3596     /* Make sure we only have keys in DB0. */
3597     for (j = 1; j < server.dbnum; j++) {
3598         if (dictSize(server.db[j].dict)) return C_ERR;
3599     }
3600 
3601     /* Check that all the slots we see populated memory have a corresponding
3602      * entry in the cluster table. Otherwise fix the table. */
3603     for (j = 0; j < CLUSTER_SLOTS; j++) {
3604         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3605         /* Check if we are assigned to this slot or if we are importing it.
3606          * In both cases check the next slot as the configuration makes
3607          * sense. */
3608         if (server.cluster->slots[j] == myself ||
3609             server.cluster->importing_slots_from[j] != NULL) continue;
3610 
3611         /* If we are here data and cluster config don't agree, and we have
3612          * slot 'j' populated even if we are not importing it, nor we are
3613          * assigned to this slot. Fix this condition. */
3614 
3615         update_config++;
3616         /* Case A: slot is unassigned. Take responsibility for it. */
3617         if (server.cluster->slots[j] == NULL) {
3618             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3619                                     "Taking responsibility for it.",j);
3620             clusterAddSlot(myself,j);
3621         } else {
3622             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3623                                     "assigned to another node. "
3624                                     "Setting it to importing state.",j);
3625             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3626         }
3627     }
3628     if (update_config) clusterSaveConfigOrDie(1);
3629     return C_OK;
3630 }
3631 
3632 /* -----------------------------------------------------------------------------
3633  * SLAVE nodes handling
3634  * -------------------------------------------------------------------------- */
3635 
3636 /* Set the specified node 'n' as master for this node.
3637  * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3638 void clusterSetMaster(clusterNode *n) {
3639     serverAssert(n != myself);
3640     serverAssert(myself->numslots == 0);
3641 
3642     if (nodeIsMaster(myself)) {
3643         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3644         myself->flags |= CLUSTER_NODE_SLAVE;
3645         clusterCloseAllSlots();
3646     } else {
3647         if (myself->slaveof)
3648             clusterNodeRemoveSlave(myself->slaveof,myself);
3649     }
3650     myself->slaveof = n;
3651     clusterNodeAddSlave(n,myself);
3652     replicationSetMaster(n->ip, n->port);
3653     resetManualFailover();
3654 }
3655 
3656 /* -----------------------------------------------------------------------------
3657  * Nodes to string representation functions.
3658  * -------------------------------------------------------------------------- */
3659 
3660 struct redisNodeFlags {
3661     uint16_t flag;
3662     char *name;
3663 };
3664 
3665 static struct redisNodeFlags redisNodeFlagsTable[] = {
3666     {CLUSTER_NODE_MYSELF,       "myself,"},
3667     {CLUSTER_NODE_MASTER,       "master,"},
3668     {CLUSTER_NODE_SLAVE,        "slave,"},
3669     {CLUSTER_NODE_PFAIL,        "fail?,"},
3670     {CLUSTER_NODE_FAIL,         "fail,"},
3671     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
3672     {CLUSTER_NODE_NOADDR,       "noaddr,"}
3673 };
3674 
3675 /* Concatenate the comma separated list of node flags to the given SDS
3676  * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)3677 sds representClusterNodeFlags(sds ci, uint16_t flags) {
3678     if (flags == 0) {
3679         ci = sdscat(ci,"noflags,");
3680     } else {
3681         int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
3682         for (i = 0; i < size; i++) {
3683             struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
3684             if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
3685         }
3686     }
3687     sdsIncrLen(ci,-1); /* Remove trailing comma. */
3688     return ci;
3689 }
3690 
3691 /* Generate a csv-alike representation of the specified cluster node.
3692  * See clusterGenNodesDescription() top comment for more information.
3693  *
3694  * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)3695 sds clusterGenNodeDescription(clusterNode *node) {
3696     int j, start;
3697     sds ci;
3698 
3699     /* Node coordinates */
3700     ci = sdscatprintf(sdsempty(),"%.40s %s:%d ",
3701         node->name,
3702         node->ip,
3703         node->port);
3704 
3705     /* Flags */
3706     ci = representClusterNodeFlags(ci, node->flags);
3707 
3708     /* Slave of... or just "-" */
3709     if (node->slaveof)
3710         ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
3711     else
3712         ci = sdscatlen(ci," - ",3);
3713 
3714     /* Latency from the POV of this node, config epoch, link status */
3715     ci = sdscatprintf(ci,"%lld %lld %llu %s",
3716         (long long) node->ping_sent,
3717         (long long) node->pong_received,
3718         (unsigned long long) node->configEpoch,
3719         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
3720                     "connected" : "disconnected");
3721 
3722     /* Slots served by this instance */
3723     start = -1;
3724     for (j = 0; j < CLUSTER_SLOTS; j++) {
3725         int bit;
3726 
3727         if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
3728             if (start == -1) start = j;
3729         }
3730         if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
3731             if (bit && j == CLUSTER_SLOTS-1) j++;
3732 
3733             if (start == j-1) {
3734                 ci = sdscatprintf(ci," %d",start);
3735             } else {
3736                 ci = sdscatprintf(ci," %d-%d",start,j-1);
3737             }
3738             start = -1;
3739         }
3740     }
3741 
3742     /* Just for MYSELF node we also dump info about slots that
3743      * we are migrating to other instances or importing from other
3744      * instances. */
3745     if (node->flags & CLUSTER_NODE_MYSELF) {
3746         for (j = 0; j < CLUSTER_SLOTS; j++) {
3747             if (server.cluster->migrating_slots_to[j]) {
3748                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
3749                     server.cluster->migrating_slots_to[j]->name);
3750             } else if (server.cluster->importing_slots_from[j]) {
3751                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
3752                     server.cluster->importing_slots_from[j]->name);
3753             }
3754         }
3755     }
3756     return ci;
3757 }
3758 
3759 /* Generate a csv-alike representation of the nodes we are aware of,
3760  * including the "myself" node, and return an SDS string containing the
3761  * representation (it is up to the caller to free it).
3762  *
3763  * All the nodes matching at least one of the node flags specified in
3764  * "filter" are excluded from the output, so using zero as a filter will
3765  * include all the known nodes in the representation, including nodes in
3766  * the HANDSHAKE state.
3767  *
3768  * The representation obtained using this function is used for the output
3769  * of the CLUSTER NODES function, and as format for the cluster
3770  * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)3771 sds clusterGenNodesDescription(int filter) {
3772     sds ci = sdsempty(), ni;
3773     dictIterator *di;
3774     dictEntry *de;
3775 
3776     di = dictGetSafeIterator(server.cluster->nodes);
3777     while((de = dictNext(di)) != NULL) {
3778         clusterNode *node = dictGetVal(de);
3779 
3780         if (node->flags & filter) continue;
3781         ni = clusterGenNodeDescription(node);
3782         ci = sdscatsds(ci,ni);
3783         sdsfree(ni);
3784         ci = sdscatlen(ci,"\n",1);
3785     }
3786     dictReleaseIterator(di);
3787     return ci;
3788 }
3789 
3790 /* -----------------------------------------------------------------------------
3791  * CLUSTER command
3792  * -------------------------------------------------------------------------- */
3793 
getSlotOrReply(client * c,robj * o)3794 int getSlotOrReply(client *c, robj *o) {
3795     long long slot;
3796 
3797     if (getLongLongFromObject(o,&slot) != C_OK ||
3798         slot < 0 || slot >= CLUSTER_SLOTS)
3799     {
3800         addReplyError(c,"Invalid or out of range slot");
3801         return -1;
3802     }
3803     return (int) slot;
3804 }
3805 
clusterReplyMultiBulkSlots(client * c)3806 void clusterReplyMultiBulkSlots(client *c) {
3807     /* Format: 1) 1) start slot
3808      *            2) end slot
3809      *            3) 1) master IP
3810      *               2) master port
3811      *               3) node ID
3812      *            4) 1) replica IP
3813      *               2) replica port
3814      *               3) node ID
3815      *           ... continued until done
3816      */
3817 
3818     int num_masters = 0;
3819     void *slot_replylen = addDeferredMultiBulkLength(c);
3820 
3821     dictEntry *de;
3822     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3823     while((de = dictNext(di)) != NULL) {
3824         clusterNode *node = dictGetVal(de);
3825         int j = 0, start = -1;
3826 
3827         /* Skip slaves (that are iterated when producing the output of their
3828          * master) and  masters not serving any slot. */
3829         if (!nodeIsMaster(node) || node->numslots == 0) continue;
3830 
3831         for (j = 0; j < CLUSTER_SLOTS; j++) {
3832             int bit, i;
3833 
3834             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
3835                 if (start == -1) start = j;
3836             }
3837             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
3838                 int nested_elements = 3; /* slots (2) + master addr (1). */
3839                 void *nested_replylen = addDeferredMultiBulkLength(c);
3840 
3841                 if (bit && j == CLUSTER_SLOTS-1) j++;
3842 
3843                 /* If slot exists in output map, add to it's list.
3844                  * else, create a new output map for this slot */
3845                 if (start == j-1) {
3846                     addReplyLongLong(c, start); /* only one slot; low==high */
3847                     addReplyLongLong(c, start);
3848                 } else {
3849                     addReplyLongLong(c, start); /* low */
3850                     addReplyLongLong(c, j-1);   /* high */
3851                 }
3852                 start = -1;
3853 
3854                 /* First node reply position is always the master */
3855                 addReplyMultiBulkLen(c, 3);
3856                 addReplyBulkCString(c, node->ip);
3857                 addReplyLongLong(c, node->port);
3858                 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
3859 
3860                 /* Remaining nodes in reply are replicas for slot range */
3861                 for (i = 0; i < node->numslaves; i++) {
3862                     /* This loop is copy/pasted from clusterGenNodeDescription()
3863                      * with modifications for per-slot node aggregation */
3864                     if (nodeFailed(node->slaves[i])) continue;
3865                     addReplyMultiBulkLen(c, 3);
3866                     addReplyBulkCString(c, node->slaves[i]->ip);
3867                     addReplyLongLong(c, node->slaves[i]->port);
3868                     addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
3869                     nested_elements++;
3870                 }
3871                 setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
3872                 num_masters++;
3873             }
3874         }
3875     }
3876     dictReleaseIterator(di);
3877     setDeferredMultiBulkLength(c, slot_replylen, num_masters);
3878 }
3879 
clusterCommand(client * c)3880 void clusterCommand(client *c) {
3881     if (server.cluster_enabled == 0) {
3882         addReplyError(c,"This instance has cluster support disabled");
3883         return;
3884     }
3885 
3886     if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
3887         long long port;
3888 
3889         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
3890             addReplyErrorFormat(c,"Invalid TCP port specified: %s",
3891                                 (char*)c->argv[3]->ptr);
3892             return;
3893         }
3894 
3895         if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
3896             errno == EINVAL)
3897         {
3898             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
3899                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
3900         } else {
3901             addReply(c,shared.ok);
3902         }
3903     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
3904         /* CLUSTER NODES */
3905         robj *o;
3906         sds ci = clusterGenNodesDescription(0);
3907 
3908         o = createObject(OBJ_STRING,ci);
3909         addReplyBulk(c,o);
3910         decrRefCount(o);
3911     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
3912         /* CLUSTER MYID */
3913         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
3914     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
3915         /* CLUSTER SLOTS */
3916         clusterReplyMultiBulkSlots(c);
3917     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
3918         /* CLUSTER FLUSHSLOTS */
3919         if (dictSize(server.db[0].dict) != 0) {
3920             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
3921             return;
3922         }
3923         clusterDelNodeSlots(myself);
3924         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
3925         addReply(c,shared.ok);
3926     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
3927                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
3928     {
3929         /* CLUSTER ADDSLOTS <slot> [slot] ... */
3930         /* CLUSTER DELSLOTS <slot> [slot] ... */
3931         int j, slot;
3932         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
3933         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
3934 
3935         memset(slots,0,CLUSTER_SLOTS);
3936         /* Check that all the arguments are parseable and that all the
3937          * slots are not already busy. */
3938         for (j = 2; j < c->argc; j++) {
3939             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
3940                 zfree(slots);
3941                 return;
3942             }
3943             if (del && server.cluster->slots[slot] == NULL) {
3944                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
3945                 zfree(slots);
3946                 return;
3947             } else if (!del && server.cluster->slots[slot]) {
3948                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
3949                 zfree(slots);
3950                 return;
3951             }
3952             if (slots[slot]++ == 1) {
3953                 addReplyErrorFormat(c,"Slot %d specified multiple times",
3954                     (int)slot);
3955                 zfree(slots);
3956                 return;
3957             }
3958         }
3959         for (j = 0; j < CLUSTER_SLOTS; j++) {
3960             if (slots[j]) {
3961                 int retval;
3962 
3963                 /* If this slot was set as importing we can clear this
3964                  * state as now we are the real owner of the slot. */
3965                 if (server.cluster->importing_slots_from[j])
3966                     server.cluster->importing_slots_from[j] = NULL;
3967 
3968                 retval = del ? clusterDelSlot(j) :
3969                                clusterAddSlot(myself,j);
3970                 serverAssertWithInfo(c,NULL,retval == C_OK);
3971             }
3972         }
3973         zfree(slots);
3974         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
3975         addReply(c,shared.ok);
3976     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
3977         /* SETSLOT 10 MIGRATING <node ID> */
3978         /* SETSLOT 10 IMPORTING <node ID> */
3979         /* SETSLOT 10 STABLE */
3980         /* SETSLOT 10 NODE <node ID> */
3981         int slot;
3982         clusterNode *n;
3983 
3984         if (nodeIsSlave(myself)) {
3985             addReplyError(c,"Please use SETSLOT only with masters.");
3986             return;
3987         }
3988 
3989         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
3990 
3991         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
3992             if (server.cluster->slots[slot] != myself) {
3993                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
3994                 return;
3995             }
3996             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
3997                 addReplyErrorFormat(c,"I don't know about node %s",
3998                     (char*)c->argv[4]->ptr);
3999                 return;
4000             }
4001             server.cluster->migrating_slots_to[slot] = n;
4002         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4003             if (server.cluster->slots[slot] == myself) {
4004                 addReplyErrorFormat(c,
4005                     "I'm already the owner of hash slot %u",slot);
4006                 return;
4007             }
4008             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4009                 addReplyErrorFormat(c,"I don't know about node %s",
4010                     (char*)c->argv[3]->ptr);
4011                 return;
4012             }
4013             server.cluster->importing_slots_from[slot] = n;
4014         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4015             /* CLUSTER SETSLOT <SLOT> STABLE */
4016             server.cluster->importing_slots_from[slot] = NULL;
4017             server.cluster->migrating_slots_to[slot] = NULL;
4018         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4019             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4020             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4021 
4022             if (!n) {
4023                 addReplyErrorFormat(c,"Unknown node %s",
4024                     (char*)c->argv[4]->ptr);
4025                 return;
4026             }
4027             /* If this hash slot was served by 'myself' before to switch
4028              * make sure there are no longer local keys for this hash slot. */
4029             if (server.cluster->slots[slot] == myself && n != myself) {
4030                 if (countKeysInSlot(slot) != 0) {
4031                     addReplyErrorFormat(c,
4032                         "Can't assign hashslot %d to a different node "
4033                         "while I still hold keys for this hash slot.", slot);
4034                     return;
4035                 }
4036             }
4037             /* If this slot is in migrating status but we have no keys
4038              * for it assigning the slot to another node will clear
4039              * the migratig status. */
4040             if (countKeysInSlot(slot) == 0 &&
4041                 server.cluster->migrating_slots_to[slot])
4042                 server.cluster->migrating_slots_to[slot] = NULL;
4043 
4044             /* If this node was importing this slot, assigning the slot to
4045              * itself also clears the importing status. */
4046             if (n == myself &&
4047                 server.cluster->importing_slots_from[slot])
4048             {
4049                 /* This slot was manually migrated, set this node configEpoch
4050                  * to a new epoch so that the new version can be propagated
4051                  * by the cluster.
4052                  *
4053                  * Note that if this ever results in a collision with another
4054                  * node getting the same configEpoch, for example because a
4055                  * failover happens at the same time we close the slot, the
4056                  * configEpoch collision resolution will fix it assigning
4057                  * a different epoch to each node. */
4058                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4059                     serverLog(LL_WARNING,
4060                         "configEpoch updated after importing slot %d", slot);
4061                 }
4062                 server.cluster->importing_slots_from[slot] = NULL;
4063             }
4064             clusterDelSlot(slot);
4065             clusterAddSlot(n,slot);
4066         } else {
4067             addReplyError(c,
4068                 "Invalid CLUSTER SETSLOT action or number of arguments");
4069             return;
4070         }
4071         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4072         addReply(c,shared.ok);
4073     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4074         /* CLUSTER BUMPEPOCH */
4075         int retval = clusterBumpConfigEpochWithoutConsensus();
4076         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4077                 (retval == C_OK) ? "BUMPED" : "STILL",
4078                 (unsigned long long) myself->configEpoch);
4079         addReplySds(c,reply);
4080     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4081         /* CLUSTER INFO */
4082         char *statestr[] = {"ok","fail","needhelp"};
4083         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4084         uint64_t myepoch;
4085         int j;
4086 
4087         for (j = 0; j < CLUSTER_SLOTS; j++) {
4088             clusterNode *n = server.cluster->slots[j];
4089 
4090             if (n == NULL) continue;
4091             slots_assigned++;
4092             if (nodeFailed(n)) {
4093                 slots_fail++;
4094             } else if (nodeTimedOut(n)) {
4095                 slots_pfail++;
4096             } else {
4097                 slots_ok++;
4098             }
4099         }
4100 
4101         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4102                   myself->slaveof->configEpoch : myself->configEpoch;
4103 
4104         sds info = sdscatprintf(sdsempty(),
4105             "cluster_state:%s\r\n"
4106             "cluster_slots_assigned:%d\r\n"
4107             "cluster_slots_ok:%d\r\n"
4108             "cluster_slots_pfail:%d\r\n"
4109             "cluster_slots_fail:%d\r\n"
4110             "cluster_known_nodes:%lu\r\n"
4111             "cluster_size:%d\r\n"
4112             "cluster_current_epoch:%llu\r\n"
4113             "cluster_my_epoch:%llu\r\n"
4114             "cluster_stats_messages_sent:%lld\r\n"
4115             "cluster_stats_messages_received:%lld\r\n"
4116             , statestr[server.cluster->state],
4117             slots_assigned,
4118             slots_ok,
4119             slots_pfail,
4120             slots_fail,
4121             dictSize(server.cluster->nodes),
4122             server.cluster->size,
4123             (unsigned long long) server.cluster->currentEpoch,
4124             (unsigned long long) myepoch,
4125             server.cluster->stats_bus_messages_sent,
4126             server.cluster->stats_bus_messages_received
4127         );
4128         addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4129             (unsigned long)sdslen(info)));
4130         addReplySds(c,info);
4131         addReply(c,shared.crlf);
4132     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4133         int retval = clusterSaveConfig(1);
4134 
4135         if (retval == 0)
4136             addReply(c,shared.ok);
4137         else
4138             addReplyErrorFormat(c,"error saving the cluster node config: %s",
4139                 strerror(errno));
4140     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4141         /* CLUSTER KEYSLOT <key> */
4142         sds key = c->argv[2]->ptr;
4143 
4144         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4145     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4146         /* CLUSTER COUNTKEYSINSLOT <slot> */
4147         long long slot;
4148 
4149         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4150             return;
4151         if (slot < 0 || slot >= CLUSTER_SLOTS) {
4152             addReplyError(c,"Invalid slot");
4153             return;
4154         }
4155         addReplyLongLong(c,countKeysInSlot(slot));
4156     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4157         /* CLUSTER GETKEYSINSLOT <slot> <count> */
4158         long long maxkeys, slot;
4159         unsigned int numkeys, j;
4160         robj **keys;
4161 
4162         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4163             return;
4164         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4165             != C_OK)
4166             return;
4167         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4168             addReplyError(c,"Invalid slot or number of keys");
4169             return;
4170         }
4171 
4172         keys = zmalloc(sizeof(robj*)*maxkeys);
4173         numkeys = getKeysInSlot(slot, keys, maxkeys);
4174         addReplyMultiBulkLen(c,numkeys);
4175         for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
4176         zfree(keys);
4177     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4178         /* CLUSTER FORGET <NODE ID> */
4179         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4180 
4181         if (!n) {
4182             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4183             return;
4184         } else if (n == myself) {
4185             addReplyError(c,"I tried hard but I can't forget myself...");
4186             return;
4187         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4188             addReplyError(c,"Can't forget my master!");
4189             return;
4190         }
4191         clusterBlacklistAddNode(n);
4192         clusterDelNode(n);
4193         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4194                              CLUSTER_TODO_SAVE_CONFIG);
4195         addReply(c,shared.ok);
4196     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4197         /* CLUSTER REPLICATE <NODE ID> */
4198         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4199 
4200         /* Lookup the specified node in our table. */
4201         if (!n) {
4202             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4203             return;
4204         }
4205 
4206         /* I can't replicate myself. */
4207         if (n == myself) {
4208             addReplyError(c,"Can't replicate myself");
4209             return;
4210         }
4211 
4212         /* Can't replicate a slave. */
4213         if (nodeIsSlave(n)) {
4214             addReplyError(c,"I can only replicate a master, not a slave.");
4215             return;
4216         }
4217 
4218         /* If the instance is currently a master, it should have no assigned
4219          * slots nor keys to accept to replicate some other node.
4220          * Slaves can switch to another master without issues. */
4221         if (nodeIsMaster(myself) &&
4222             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4223             addReplyError(c,
4224                 "To set a master the node must be empty and "
4225                 "without assigned slots.");
4226             return;
4227         }
4228 
4229         /* Set the master. */
4230         clusterSetMaster(n);
4231         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4232         addReply(c,shared.ok);
4233     } else if (!strcasecmp(c->argv[1]->ptr,"slaves") && c->argc == 3) {
4234         /* CLUSTER SLAVES <NODE ID> */
4235         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4236         int j;
4237 
4238         /* Lookup the specified node in our table. */
4239         if (!n) {
4240             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4241             return;
4242         }
4243 
4244         if (nodeIsSlave(n)) {
4245             addReplyError(c,"The specified node is not a master");
4246             return;
4247         }
4248 
4249         addReplyMultiBulkLen(c,n->numslaves);
4250         for (j = 0; j < n->numslaves; j++) {
4251             sds ni = clusterGenNodeDescription(n->slaves[j]);
4252             addReplyBulkCString(c,ni);
4253             sdsfree(ni);
4254         }
4255     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4256                c->argc == 3)
4257     {
4258         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4259         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4260 
4261         if (!n) {
4262             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4263             return;
4264         } else {
4265             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4266         }
4267     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4268                (c->argc == 2 || c->argc == 3))
4269     {
4270         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4271         int force = 0, takeover = 0;
4272 
4273         if (c->argc == 3) {
4274             if (!strcasecmp(c->argv[2]->ptr,"force")) {
4275                 force = 1;
4276             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4277                 takeover = 1;
4278                 force = 1; /* Takeover also implies force. */
4279             } else {
4280                 addReply(c,shared.syntaxerr);
4281                 return;
4282             }
4283         }
4284 
4285         /* Check preconditions. */
4286         if (nodeIsMaster(myself)) {
4287             addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
4288             return;
4289         } else if (myself->slaveof == NULL) {
4290             addReplyError(c,"I'm a slave but my master is unknown to me");
4291             return;
4292         } else if (!force &&
4293                    (nodeFailed(myself->slaveof) ||
4294                     myself->slaveof->link == NULL))
4295         {
4296             addReplyError(c,"Master is down or failed, "
4297                             "please use CLUSTER FAILOVER FORCE");
4298             return;
4299         }
4300         resetManualFailover();
4301         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4302 
4303         if (takeover) {
4304             /* A takeover does not perform any initial check. It just
4305              * generates a new configuration epoch for this node without
4306              * consensus, claims the master's slots, and broadcast the new
4307              * configuration. */
4308             serverLog(LL_WARNING,"Taking over the master (user request).");
4309             clusterBumpConfigEpochWithoutConsensus();
4310             clusterFailoverReplaceYourMaster();
4311         } else if (force) {
4312             /* If this is a forced failover, we don't need to talk with our
4313              * master to agree about the offset. We just failover taking over
4314              * it without coordination. */
4315             serverLog(LL_WARNING,"Forced failover user request accepted.");
4316             server.cluster->mf_can_start = 1;
4317         } else {
4318             serverLog(LL_WARNING,"Manual failover user request accepted.");
4319             clusterSendMFStart(myself->slaveof);
4320         }
4321         addReply(c,shared.ok);
4322     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4323     {
4324         /* CLUSTER SET-CONFIG-EPOCH <epoch>
4325          *
4326          * The user is allowed to set the config epoch only when a node is
4327          * totally fresh: no config epoch, no other known node, and so forth.
4328          * This happens at cluster creation time to start with a cluster where
4329          * every node has a different node ID, without to rely on the conflicts
4330          * resolution system which is too slow when a big cluster is created. */
4331         long long epoch;
4332 
4333         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4334             return;
4335 
4336         if (epoch < 0) {
4337             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4338         } else if (dictSize(server.cluster->nodes) > 1) {
4339             addReplyError(c,"The user can assign a config epoch only when the "
4340                             "node does not know any other node.");
4341         } else if (myself->configEpoch != 0) {
4342             addReplyError(c,"Node config epoch is already non-zero");
4343         } else {
4344             myself->configEpoch = epoch;
4345             serverLog(LL_WARNING,
4346                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4347                 (unsigned long long) myself->configEpoch);
4348 
4349             if (server.cluster->currentEpoch < (uint64_t)epoch)
4350                 server.cluster->currentEpoch = epoch;
4351             /* No need to fsync the config here since in the unlucky event
4352              * of a failure to persist the config, the conflict resolution code
4353              * will assign an unique config to this node. */
4354             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4355                                  CLUSTER_TODO_SAVE_CONFIG);
4356             addReply(c,shared.ok);
4357         }
4358     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4359                (c->argc == 2 || c->argc == 3))
4360     {
4361         /* CLUSTER RESET [SOFT|HARD] */
4362         int hard = 0;
4363 
4364         /* Parse soft/hard argument. Default is soft. */
4365         if (c->argc == 3) {
4366             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4367                 hard = 1;
4368             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4369                 hard = 0;
4370             } else {
4371                 addReply(c,shared.syntaxerr);
4372                 return;
4373             }
4374         }
4375 
4376         /* Slaves can be reset while containing data, but not master nodes
4377          * that must be empty. */
4378         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4379             addReplyError(c,"CLUSTER RESET can't be called with "
4380                             "master nodes containing keys");
4381             return;
4382         }
4383         clusterReset(hard);
4384         addReply(c,shared.ok);
4385     } else {
4386         addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
4387     }
4388 }
4389 
4390 /* -----------------------------------------------------------------------------
4391  * DUMP, RESTORE and MIGRATE commands
4392  * -------------------------------------------------------------------------- */
4393 
4394 /* Generates a DUMP-format representation of the object 'o', adding it to the
4395  * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o)4396 void createDumpPayload(rio *payload, robj *o) {
4397     unsigned char buf[2];
4398     uint64_t crc;
4399 
4400     /* Serialize the object in a RDB-like format. It consist of an object type
4401      * byte followed by the serialized object. This is understood by RESTORE. */
4402     rioInitWithBuffer(payload,sdsempty());
4403     serverAssert(rdbSaveObjectType(payload,o));
4404     serverAssert(rdbSaveObject(payload,o));
4405 
4406     /* Write the footer, this is how it looks like:
4407      * ----------------+---------------------+---------------+
4408      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4409      * ----------------+---------------------+---------------+
4410      * RDB version and CRC are both in little endian.
4411      */
4412 
4413     /* RDB version */
4414     buf[0] = RDB_VERSION & 0xff;
4415     buf[1] = (RDB_VERSION >> 8) & 0xff;
4416     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4417 
4418     /* CRC64 */
4419     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4420                 sdslen(payload->io.buffer.ptr));
4421     memrev64ifbe(&crc);
4422     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4423 }
4424 
4425 /* Verify that the RDB version of the dump payload matches the one of this Redis
4426  * instance and that the checksum is ok.
4427  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4428  * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4429 int verifyDumpPayload(unsigned char *p, size_t len) {
4430     unsigned char *footer;
4431     uint16_t rdbver;
4432     uint64_t crc;
4433 
4434     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4435     if (len < 10) return C_ERR;
4436     footer = p+(len-10);
4437 
4438     /* Verify RDB version */
4439     rdbver = (footer[1] << 8) | footer[0];
4440     if (rdbver > RDB_VERSION) return C_ERR;
4441 
4442     /* Verify CRC64 */
4443     crc = crc64(0,p,len-8);
4444     memrev64ifbe(&crc);
4445     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4446 }
4447 
4448 /* DUMP keyname
4449  * DUMP is actually not used by Redis Cluster but it is the obvious
4450  * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4451 void dumpCommand(client *c) {
4452     robj *o, *dumpobj;
4453     rio payload;
4454 
4455     /* Check if the key is here. */
4456     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4457         addReply(c,shared.nullbulk);
4458         return;
4459     }
4460 
4461     /* Create the DUMP encoded representation. */
4462     createDumpPayload(&payload,o);
4463 
4464     /* Transfer to the client */
4465     dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4466     addReplyBulk(c,dumpobj);
4467     decrRefCount(dumpobj);
4468     return;
4469 }
4470 
4471 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4472 void restoreCommand(client *c) {
4473     long long ttl;
4474     rio payload;
4475     int j, type, replace = 0;
4476     robj *obj;
4477 
4478     /* Parse additional options */
4479     for (j = 4; j < c->argc; j++) {
4480         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4481             replace = 1;
4482         } else {
4483             addReply(c,shared.syntaxerr);
4484             return;
4485         }
4486     }
4487 
4488     /* Make sure this key does not already exist here... */
4489     if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
4490         addReply(c,shared.busykeyerr);
4491         return;
4492     }
4493 
4494     /* Check if the TTL value makes sense */
4495     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4496         return;
4497     } else if (ttl < 0) {
4498         addReplyError(c,"Invalid TTL value, must be >= 0");
4499         return;
4500     }
4501 
4502     /* Verify RDB version and data checksum. */
4503     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4504     {
4505         addReplyError(c,"DUMP payload version or checksum are wrong");
4506         return;
4507     }
4508 
4509     rioInitWithBuffer(&payload,c->argv[3]->ptr);
4510     if (((type = rdbLoadObjectType(&payload)) == -1) ||
4511         ((obj = rdbLoadObject(type,&payload)) == NULL))
4512     {
4513         addReplyError(c,"Bad data format");
4514         return;
4515     }
4516 
4517     /* Remove the old key if needed. */
4518     if (replace) dbDelete(c->db,c->argv[1]);
4519 
4520     /* Create the key and set the TTL if any */
4521     dbAdd(c->db,c->argv[1],obj);
4522     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
4523     signalModifiedKey(c->db,c->argv[1]);
4524     addReply(c,shared.ok);
4525     server.dirty++;
4526 }
4527 
4528 /* MIGRATE socket cache implementation.
4529  *
4530  * We take a map between host:ip and a TCP socket that we used to connect
4531  * to this instance in recent time.
4532  * This sockets are closed when the max number we cache is reached, and also
4533  * in serverCron() when they are around for more than a few seconds. */
4534 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
4535 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
4536 
4537 typedef struct migrateCachedSocket {
4538     int fd;
4539     long last_dbid;
4540     time_t last_use_time;
4541 } migrateCachedSocket;
4542 
4543 /* Return a migrateCachedSocket containing a TCP socket connected with the
4544  * target instance, possibly returning a cached one.
4545  *
4546  * This function is responsible of sending errors to the client if a
4547  * connection can't be established. In this case -1 is returned.
4548  * Otherwise on success the socket is returned, and the caller should not
4549  * attempt to free it after usage.
4550  *
4551  * If the caller detects an error while using the socket, migrateCloseSocket()
4552  * should be called so that the connection will be created from scratch
4553  * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)4554 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
4555     int fd;
4556     sds name = sdsempty();
4557     migrateCachedSocket *cs;
4558 
4559     /* Check if we have an already cached socket for this ip:port pair. */
4560     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4561     name = sdscatlen(name,":",1);
4562     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4563     cs = dictFetchValue(server.migrate_cached_sockets,name);
4564     if (cs) {
4565         sdsfree(name);
4566         cs->last_use_time = server.unixtime;
4567         return cs;
4568     }
4569 
4570     /* No cached socket, create one. */
4571     if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
4572         /* Too many items, drop one at random. */
4573         dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4574         cs = dictGetVal(de);
4575         close(cs->fd);
4576         zfree(cs);
4577         dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4578     }
4579 
4580     /* Create the socket */
4581     fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
4582                                 atoi(c->argv[2]->ptr));
4583     if (fd == -1) {
4584         sdsfree(name);
4585         addReplyErrorFormat(c,"Can't connect to target node: %s",
4586             server.neterr);
4587         return NULL;
4588     }
4589     anetEnableTcpNoDelay(server.neterr,fd);
4590 
4591     /* Check if it connects within the specified timeout. */
4592     if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
4593         sdsfree(name);
4594         addReplySds(c,
4595             sdsnew("-IOERR error or timeout connecting to the client\r\n"));
4596         close(fd);
4597         return NULL;
4598     }
4599 
4600     /* Add to the cache and return it to the caller. */
4601     cs = zmalloc(sizeof(*cs));
4602     cs->fd = fd;
4603     cs->last_dbid = -1;
4604     cs->last_use_time = server.unixtime;
4605     dictAdd(server.migrate_cached_sockets,name,cs);
4606     return cs;
4607 }
4608 
4609 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)4610 void migrateCloseSocket(robj *host, robj *port) {
4611     sds name = sdsempty();
4612     migrateCachedSocket *cs;
4613 
4614     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4615     name = sdscatlen(name,":",1);
4616     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4617     cs = dictFetchValue(server.migrate_cached_sockets,name);
4618     if (!cs) {
4619         sdsfree(name);
4620         return;
4621     }
4622 
4623     close(cs->fd);
4624     zfree(cs);
4625     dictDelete(server.migrate_cached_sockets,name);
4626     sdsfree(name);
4627 }
4628 
migrateCloseTimedoutSockets(void)4629 void migrateCloseTimedoutSockets(void) {
4630     dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
4631     dictEntry *de;
4632 
4633     while((de = dictNext(di)) != NULL) {
4634         migrateCachedSocket *cs = dictGetVal(de);
4635 
4636         if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
4637             close(cs->fd);
4638             zfree(cs);
4639             dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4640         }
4641     }
4642     dictReleaseIterator(di);
4643 }
4644 
4645 /* MIGRATE host port key dbid timeout [COPY | REPLACE]
4646  *
4647  * On in the multiple keys form:
4648  *
4649  * MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN */
migrateCommand(client * c)4650 void migrateCommand(client *c) {
4651     migrateCachedSocket *cs;
4652     int copy, replace, j;
4653     long timeout;
4654     long dbid;
4655     long long ttl, expireat;
4656     robj **ov = NULL; /* Objects to migrate. */
4657     robj **kv = NULL; /* Key names. */
4658     robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
4659     rio cmd, payload;
4660     int may_retry = 1;
4661     int write_error = 0;
4662 
4663     /* To support the KEYS option we need the following additional state. */
4664     int first_key = 3; /* Argument index of the first key. */
4665     int num_keys = 1;  /* By default only migrate the 'key' argument. */
4666 
4667     /* Initialization */
4668     copy = 0;
4669     replace = 0;
4670     ttl = 0;
4671 
4672     /* Parse additional options */
4673     for (j = 6; j < c->argc; j++) {
4674         if (!strcasecmp(c->argv[j]->ptr,"copy")) {
4675             copy = 1;
4676         } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4677             replace = 1;
4678         } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
4679             if (sdslen(c->argv[3]->ptr) != 0) {
4680                 addReplyError(c,
4681                     "When using MIGRATE KEYS option, the key argument"
4682                     " must be set to the empty string");
4683                 return;
4684             }
4685             first_key = j+1;
4686             num_keys = c->argc - j - 1;
4687             break; /* All the remaining args are keys. */
4688         } else {
4689             addReply(c,shared.syntaxerr);
4690             return;
4691         }
4692     }
4693 
4694     /* Sanity check */
4695     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
4696         getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
4697     {
4698         return;
4699     }
4700     if (timeout <= 0) timeout = 1000;
4701 
4702     /* Check if the keys are here. If at least one key is to migrate, do it
4703      * otherwise if all the keys are missing reply with "NOKEY" to signal
4704      * the caller there was nothing to migrate. We don't return an error in
4705      * this case, since often this is due to a normal condition like the key
4706      * expiring in the meantime. */
4707     ov = zrealloc(ov,sizeof(robj*)*num_keys);
4708     kv = zrealloc(kv,sizeof(robj*)*num_keys);
4709     int oi = 0;
4710 
4711     for (j = 0; j < num_keys; j++) {
4712         if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
4713             kv[oi] = c->argv[first_key+j];
4714             oi++;
4715         }
4716     }
4717     num_keys = oi;
4718     if (num_keys == 0) {
4719         zfree(ov); zfree(kv);
4720         addReplySds(c,sdsnew("+NOKEY\r\n"));
4721         return;
4722     }
4723 
4724 try_again:
4725     write_error = 0;
4726 
4727     /* Connect */
4728     cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
4729     if (cs == NULL) {
4730         zfree(ov); zfree(kv);
4731         return; /* error sent to the client by migrateGetSocket() */
4732     }
4733 
4734     rioInitWithBuffer(&cmd,sdsempty());
4735 
4736     /* Send the SELECT command if the current DB is not already selected. */
4737     int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
4738     if (select) {
4739         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
4740         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
4741         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
4742     }
4743 
4744     /* Create RESTORE payload and generate the protocol to call the command. */
4745     for (j = 0; j < num_keys; j++) {
4746         expireat = getExpire(c->db,kv[j]);
4747         if (expireat != -1) {
4748             ttl = expireat-mstime();
4749             if (ttl < 1) ttl = 1;
4750         }
4751         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
4752         if (server.cluster_enabled)
4753             serverAssertWithInfo(c,NULL,
4754                 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
4755         else
4756             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
4757         serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
4758         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
4759                 sdslen(kv[j]->ptr)));
4760         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
4761 
4762         /* Emit the payload argument, that is the serialized object using
4763          * the DUMP format. */
4764         createDumpPayload(&payload,ov[j]);
4765         serverAssertWithInfo(c,NULL,
4766             rioWriteBulkString(&cmd,payload.io.buffer.ptr,
4767                                sdslen(payload.io.buffer.ptr)));
4768         sdsfree(payload.io.buffer.ptr);
4769 
4770         /* Add the REPLACE option to the RESTORE command if it was specified
4771          * as a MIGRATE option. */
4772         if (replace)
4773             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
4774     }
4775 
4776     /* Transfer the query to the other node in 64K chunks. */
4777     errno = 0;
4778     {
4779         sds buf = cmd.io.buffer.ptr;
4780         size_t pos = 0, towrite;
4781         int nwritten = 0;
4782 
4783         while ((towrite = sdslen(buf)-pos) > 0) {
4784             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
4785             nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
4786             if (nwritten != (signed)towrite) {
4787                 write_error = 1;
4788                 goto socket_err;
4789             }
4790             pos += nwritten;
4791         }
4792     }
4793 
4794     char buf1[1024]; /* Select reply. */
4795     char buf2[1024]; /* Restore reply. */
4796 
4797     /* Read the SELECT reply if needed. */
4798     if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
4799         goto socket_err;
4800 
4801     /* Read the RESTORE replies. */
4802     int error_from_target = 0;
4803     int socket_error = 0;
4804     int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
4805 
4806     if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
4807 
4808     for (j = 0; j < num_keys; j++) {
4809         if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
4810             socket_error = 1;
4811             break;
4812         }
4813         if ((select && buf1[0] == '-') || buf2[0] == '-') {
4814             /* On error assume that last_dbid is no longer valid. */
4815             if (!error_from_target) {
4816                 cs->last_dbid = -1;
4817                 addReplyErrorFormat(c,"Target instance replied with error: %s",
4818                     (select && buf1[0] == '-') ? buf1+1 : buf2+1);
4819                 error_from_target = 1;
4820             }
4821         } else {
4822             if (!copy) {
4823                 /* No COPY option: remove the local key, signal the change. */
4824                 dbDelete(c->db,kv[j]);
4825                 signalModifiedKey(c->db,kv[j]);
4826                 server.dirty++;
4827 
4828                 /* Populate the argument vector to replace the old one. */
4829                 newargv[del_idx++] = kv[j];
4830                 incrRefCount(kv[j]);
4831             }
4832         }
4833     }
4834 
4835     /* On socket error, if we want to retry, do it now before rewriting the
4836      * command vector. We only retry if we are sure nothing was processed
4837      * and we failed to read the first reply (j == 0 test). */
4838     if (!error_from_target && socket_error && j == 0 && may_retry &&
4839         errno != ETIMEDOUT)
4840     {
4841         goto socket_err; /* A retry is guaranteed because of tested conditions.*/
4842     }
4843 
4844     if (!copy) {
4845         /* Translate MIGRATE as DEL for replication/AOF. */
4846         if (del_idx > 1) {
4847             newargv[0] = createStringObject("DEL",3);
4848             /* Note that the following call takes ownership of newargv. */
4849             replaceClientCommandVector(c,del_idx,newargv);
4850         } else {
4851             /* No key transfer acknowledged, no need to rewrite as DEL. */
4852             zfree(newargv);
4853         }
4854         newargv = NULL; /* Make it safe to call zfree() on it in the future. */
4855     }
4856 
4857     /* If we are here and a socket error happened, we don't want to retry.
4858      * Just signal the problem to the client, but only do it if we don't
4859      * already queued a different error reported by the destination server. */
4860     if (!error_from_target && socket_error) {
4861         may_retry = 0;
4862         goto socket_err;
4863     }
4864 
4865     if (!error_from_target) {
4866         /* Success! Update the last_dbid in migrateCachedSocket, so that we can
4867          * avoid SELECT the next time if the target DB is the same. Reply +OK. */
4868         cs->last_dbid = dbid;
4869         addReply(c,shared.ok);
4870     } else {
4871         /* On error we already sent it in the for loop above, and set
4872          * the curretly selected socket to -1 to force SELECT the next time. */
4873     }
4874 
4875     sdsfree(cmd.io.buffer.ptr);
4876     zfree(ov); zfree(kv); zfree(newargv);
4877     if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
4878     return;
4879 
4880 /* On socket errors we try to close the cached socket and try again.
4881  * It is very common for the cached socket to get closed, if just reopening
4882  * it works it's a shame to notify the error to the caller. */
4883 socket_err:
4884     /* Cleanup we want to perform in both the retry and no retry case.
4885      * Note: Closing the migrate socket will also force SELECT next time. */
4886     sdsfree(cmd.io.buffer.ptr);
4887     migrateCloseSocket(c->argv[1],c->argv[2]);
4888     zfree(newargv);
4889     newargv = NULL; /* This will get reallocated on retry. */
4890 
4891     /* Retry only if it's not a timeout and we never attempted a retry
4892      * (or the code jumping here did not set may_retry to zero). */
4893     if (errno != ETIMEDOUT && may_retry) {
4894         may_retry = 0;
4895         goto try_again;
4896     }
4897 
4898     /* Cleanup we want to do if no retry is attempted. */
4899     zfree(ov); zfree(kv);
4900     addReplySds(c,
4901         sdscatprintf(sdsempty(),
4902             "-IOERR error or timeout %s to target instance\r\n",
4903             write_error ? "writing" : "reading"));
4904     return;
4905 }
4906 
4907 /* -----------------------------------------------------------------------------
4908  * Cluster functions related to serving / redirecting clients
4909  * -------------------------------------------------------------------------- */
4910 
4911 /* The ASKING command is required after a -ASK redirection.
4912  * The client should issue ASKING before to actually send the command to
4913  * the target instance. See the Redis Cluster specification for more
4914  * information. */
askingCommand(client * c)4915 void askingCommand(client *c) {
4916     if (server.cluster_enabled == 0) {
4917         addReplyError(c,"This instance has cluster support disabled");
4918         return;
4919     }
4920     c->flags |= CLIENT_ASKING;
4921     addReply(c,shared.ok);
4922 }
4923 
4924 /* The READONLY command is used by clients to enter the read-only mode.
4925  * In this mode slaves will not redirect clients as long as clients access
4926  * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)4927 void readonlyCommand(client *c) {
4928     if (server.cluster_enabled == 0) {
4929         addReplyError(c,"This instance has cluster support disabled");
4930         return;
4931     }
4932     c->flags |= CLIENT_READONLY;
4933     addReply(c,shared.ok);
4934 }
4935 
4936 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)4937 void readwriteCommand(client *c) {
4938     c->flags &= ~CLIENT_READONLY;
4939     addReply(c,shared.ok);
4940 }
4941 
4942 /* Return the pointer to the cluster node that is able to serve the command.
4943  * For the function to succeed the command should only target either:
4944  *
4945  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
4946  * 2) Multiple keys in the same hash slot, while the slot is stable (no
4947  *    resharding in progress).
4948  *
4949  * On success the function returns the node that is able to serve the request.
4950  * If the node is not 'myself' a redirection must be perfomed. The kind of
4951  * redirection is specified setting the integer passed by reference
4952  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
4953  * CLUSTER_REDIR_MOVED.
4954  *
4955  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
4956  *
4957  * If the command fails NULL is returned, and the reason of the failure is
4958  * provided via 'error_code', which will be set to:
4959  *
4960  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
4961  * don't belong to the same hash slot.
4962  *
4963  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
4964  * belonging to the same slot, but the slot is not stable (in migration or
4965  * importing state, likely because a resharding is in progress).
4966  *
4967  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
4968  * not bound to any node. In this case the cluster global state should be
4969  * already "down" but it is fragile to rely on the update of the global state,
4970  * so we also handle it here.
4971  *
4972  * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
4973  * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)4974 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
4975     clusterNode *n = NULL;
4976     robj *firstkey = NULL;
4977     int multiple_keys = 0;
4978     multiState *ms, _ms;
4979     multiCmd mc;
4980     int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
4981 
4982     /* Set error code optimistically for the base case. */
4983     if (error_code) *error_code = CLUSTER_REDIR_NONE;
4984 
4985     /* We handle all the cases as if they were EXEC commands, so we have
4986      * a common code path for everything */
4987     if (cmd->proc == execCommand) {
4988         /* If CLIENT_MULTI flag is not set EXEC is just going to return an
4989          * error. */
4990         if (!(c->flags & CLIENT_MULTI)) return myself;
4991         ms = &c->mstate;
4992     } else {
4993         /* In order to have a single codepath create a fake Multi State
4994          * structure if the client is not in MULTI/EXEC state, this way
4995          * we have a single codepath below. */
4996         ms = &_ms;
4997         _ms.commands = &mc;
4998         _ms.count = 1;
4999         mc.argv = argv;
5000         mc.argc = argc;
5001         mc.cmd = cmd;
5002     }
5003 
5004     /* Check that all the keys are in the same hash slot, and obtain this
5005      * slot and the node associated. */
5006     for (i = 0; i < ms->count; i++) {
5007         struct redisCommand *mcmd;
5008         robj **margv;
5009         int margc, *keyindex, numkeys, j;
5010 
5011         mcmd = ms->commands[i].cmd;
5012         margc = ms->commands[i].argc;
5013         margv = ms->commands[i].argv;
5014 
5015         keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5016         for (j = 0; j < numkeys; j++) {
5017             robj *thiskey = margv[keyindex[j]];
5018             int thisslot = keyHashSlot((char*)thiskey->ptr,
5019                                        sdslen(thiskey->ptr));
5020 
5021             if (firstkey == NULL) {
5022                 /* This is the first key we see. Check what is the slot
5023                  * and node. */
5024                 firstkey = thiskey;
5025                 slot = thisslot;
5026                 n = server.cluster->slots[slot];
5027 
5028                 /* Error: If a slot is not served, we are in "cluster down"
5029                  * state. However the state is yet to be updated, so this was
5030                  * not trapped earlier in processCommand(). Report the same
5031                  * error to the client. */
5032                 if (n == NULL) {
5033                     getKeysFreeResult(keyindex);
5034                     if (error_code)
5035                         *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5036                     return NULL;
5037                 }
5038 
5039                 /* If we are migrating or importing this slot, we need to check
5040                  * if we have all the keys in the request (the only way we
5041                  * can safely serve the request, otherwise we return a TRYAGAIN
5042                  * error). To do so we set the importing/migrating state and
5043                  * increment a counter for every missing key. */
5044                 if (n == myself &&
5045                     server.cluster->migrating_slots_to[slot] != NULL)
5046                 {
5047                     migrating_slot = 1;
5048                 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5049                     importing_slot = 1;
5050                 }
5051             } else {
5052                 /* If it is not the first key, make sure it is exactly
5053                  * the same key as the first we saw. */
5054                 if (!equalStringObjects(firstkey,thiskey)) {
5055                     if (slot != thisslot) {
5056                         /* Error: multiple keys from different slots. */
5057                         getKeysFreeResult(keyindex);
5058                         if (error_code)
5059                             *error_code = CLUSTER_REDIR_CROSS_SLOT;
5060                         return NULL;
5061                     } else {
5062                         /* Flag this request as one with multiple different
5063                          * keys. */
5064                         multiple_keys = 1;
5065                     }
5066                 }
5067             }
5068 
5069             /* Migarting / Improrting slot? Count keys we don't have. */
5070             if ((migrating_slot || importing_slot) &&
5071                 lookupKeyRead(&server.db[0],thiskey) == NULL)
5072             {
5073                 missing_keys++;
5074             }
5075         }
5076         getKeysFreeResult(keyindex);
5077     }
5078 
5079     /* No key at all in command? then we can serve the request
5080      * without redirections or errors in all the cases. */
5081     if (n == NULL) return myself;
5082 
5083     /* Cluster is globally down but we got keys? We can't serve the request. */
5084     if (server.cluster->state != CLUSTER_OK) {
5085         if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5086         return NULL;
5087     }
5088 
5089     /* Return the hashslot by reference. */
5090     if (hashslot) *hashslot = slot;
5091 
5092     /* MIGRATE always works in the context of the local node if the slot
5093      * is open (migrating or importing state). We need to be able to freely
5094      * move keys among instances in this case. */
5095     if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5096         return myself;
5097 
5098     /* If we don't have all the keys and we are migrating the slot, send
5099      * an ASK redirection. */
5100     if (migrating_slot && missing_keys) {
5101         if (error_code) *error_code = CLUSTER_REDIR_ASK;
5102         return server.cluster->migrating_slots_to[slot];
5103     }
5104 
5105     /* If we are receiving the slot, and the client correctly flagged the
5106      * request as "ASKING", we can serve the request. However if the request
5107      * involves multiple keys and we don't have them all, the only option is
5108      * to send a TRYAGAIN error. */
5109     if (importing_slot &&
5110         (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5111     {
5112         if (multiple_keys && missing_keys) {
5113             if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5114             return NULL;
5115         } else {
5116             return myself;
5117         }
5118     }
5119 
5120     /* Handle the read-only client case reading from a slave: if this
5121      * node is a slave and the request is about an hash slot our master
5122      * is serving, we can reply without redirection. */
5123     if (c->flags & CLIENT_READONLY &&
5124         cmd->flags & CMD_READONLY &&
5125         nodeIsSlave(myself) &&
5126         myself->slaveof == n)
5127     {
5128         return myself;
5129     }
5130 
5131     /* Base case: just return the right node. However if this node is not
5132      * myself, set error_code to MOVED since we need to issue a rediretion. */
5133     if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5134     return n;
5135 }
5136 
5137 /* Send the client the right redirection code, according to error_code
5138  * that should be set to one of CLUSTER_REDIR_* macros.
5139  *
5140  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5141  * are used, then the node 'n' should not be NULL, but should be the
5142  * node we want to mention in the redirection. Moreover hashslot should
5143  * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5144 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5145     if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5146         addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5147     } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5148         /* The request spawns mutliple keys in the same slot,
5149          * but the slot is not "stable" currently as there is
5150          * a migration or import in progress. */
5151         addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5152     } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5153         addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5154     } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5155         addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5156     } else if (error_code == CLUSTER_REDIR_MOVED ||
5157                error_code == CLUSTER_REDIR_ASK)
5158     {
5159         addReplySds(c,sdscatprintf(sdsempty(),
5160             "-%s %d %s:%d\r\n",
5161             (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5162             hashslot,n->ip,n->port));
5163     } else {
5164         serverPanic("getNodeByQuery() unknown error.");
5165     }
5166 }
5167 
5168 /* This function is called by the function processing clients incrementally
5169  * to detect timeouts, in order to handle the following case:
5170  *
5171  * 1) A client blocks with BLPOP or similar blocking operation.
5172  * 2) The master migrates the hash slot elsewhere or turns into a slave.
5173  * 3) The client may remain blocked forever (or up to the max timeout time)
5174  *    waiting for a key change that will never happen.
5175  *
5176  * If the client is found to be blocked into an hash slot this node no
5177  * longer handles, the client is sent a redirection error, and the function
5178  * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5179 int clusterRedirectBlockedClientIfNeeded(client *c) {
5180     if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_LIST) {
5181         dictEntry *de;
5182         dictIterator *di;
5183 
5184         /* If the cluster is down, unblock the client with the right error. */
5185         if (server.cluster->state == CLUSTER_FAIL) {
5186             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5187             return 1;
5188         }
5189 
5190         di = dictGetIterator(c->bpop.keys);
5191         while((de = dictNext(di)) != NULL) {
5192             robj *key = dictGetKey(de);
5193             int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5194             clusterNode *node = server.cluster->slots[slot];
5195 
5196             /* We send an error and unblock the client if:
5197              * 1) The slot is unassigned, emitting a cluster down error.
5198              * 2) The slot is not handled by this node, nor being imported. */
5199             if (node != myself &&
5200                 server.cluster->importing_slots_from[slot] == NULL)
5201             {
5202                 if (node == NULL) {
5203                     clusterRedirectClient(c,NULL,0,
5204                         CLUSTER_REDIR_DOWN_UNBOUND);
5205                 } else {
5206                     clusterRedirectClient(c,node,slot,
5207                         CLUSTER_REDIR_MOVED);
5208                 }
5209                 return 1;
5210             }
5211         }
5212         dictReleaseIterator(di);
5213     }
5214     return 0;
5215 }
5216