xref: /f-stack/app/redis-5.0.5/src/cluster.c (revision 572c4311)
1 /* Redis Cluster implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34 
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43 
44 /* A global reference to myself is handy to make code more clear.
45  * Myself always points to server.cluster->myself, that is, the clusterNode
46  * that represents this node. */
47 clusterNode *myself = NULL;
48 
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79 
80 /* -----------------------------------------------------------------------------
81  * Initialization
82  * -------------------------------------------------------------------------- */
83 
84 /* Load the cluster config from 'filename'.
85  *
86  * If the file does not exist or is zero-length (this may happen because
87  * when we lock the nodes.conf file, we create a zero-length one for the
88  * sake of locking if it does not already exist), C_ERR is returned.
89  * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91     FILE *fp = fopen(filename,"r");
92     struct stat sb;
93     char *line;
94     int maxline, j;
95 
96     if (fp == NULL) {
97         if (errno == ENOENT) {
98             return C_ERR;
99         } else {
100             serverLog(LL_WARNING,
101                 "Loading the cluster node config from %s: %s",
102                 filename, strerror(errno));
103             exit(1);
104         }
105     }
106 
107     /* Check if the file is zero-length: if so return C_ERR to signal
108      * we have to write the config. */
109     if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110         fclose(fp);
111         return C_ERR;
112     }
113 
114     /* Parse the file. Note that single lines of the cluster config file can
115      * be really long as they include all the hash slots of the node.
116      * This means in the worst possible case, half of the Redis slots will be
117      * present in a single line, possibly in importing or migrating state, so
118      * together with the node ID of the sender/receiver.
119      *
120      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121     maxline = 1024+CLUSTER_SLOTS*128;
122     line = zmalloc(maxline);
123     while(fgets(line,maxline,fp) != NULL) {
124         int argc;
125         sds *argv;
126         clusterNode *n, *master;
127         char *p, *s;
128 
129         /* Skip blank lines, they can be created either by users manually
130          * editing nodes.conf or by the config writing process if stopped
131          * before the truncate() call. */
132         if (line[0] == '\n' || line[0] == '\0') continue;
133 
134         /* Split the line into arguments for processing. */
135         argv = sdssplitargs(line,&argc);
136         if (argv == NULL) goto fmterr;
137 
138         /* Handle the special "vars" line. Don't pretend it is the last
139          * line even if it actually is when generated by Redis. */
140         if (strcasecmp(argv[0],"vars") == 0) {
141             for (j = 1; j < argc; j += 2) {
142                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
143                     server.cluster->currentEpoch =
144                             strtoull(argv[j+1],NULL,10);
145                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
146                     server.cluster->lastVoteEpoch =
147                             strtoull(argv[j+1],NULL,10);
148                 } else {
149                     serverLog(LL_WARNING,
150                         "Skipping unknown cluster config variable '%s'",
151                         argv[j]);
152                 }
153             }
154             sdsfreesplitres(argv,argc);
155             continue;
156         }
157 
158         /* Regular config lines have at least eight fields */
159         if (argc < 8) goto fmterr;
160 
161         /* Create this node if it does not exist */
162         n = clusterLookupNode(argv[0]);
163         if (!n) {
164             n = createClusterNode(argv[0],0);
165             clusterAddNode(n);
166         }
167         /* Address and port */
168         if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
169         *p = '\0';
170         memcpy(n->ip,argv[1],strlen(argv[1])+1);
171         char *port = p+1;
172         char *busp = strchr(port,'@');
173         if (busp) {
174             *busp = '\0';
175             busp++;
176         }
177         n->port = atoi(port);
178         /* In older versions of nodes.conf the "@busport" part is missing.
179          * In this case we set it to the default offset of 10000 from the
180          * base port. */
181         n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
182 
183         /* Parse flags */
184         p = s = argv[2];
185         while(p) {
186             p = strchr(s,',');
187             if (p) *p = '\0';
188             if (!strcasecmp(s,"myself")) {
189                 serverAssert(server.cluster->myself == NULL);
190                 myself = server.cluster->myself = n;
191                 n->flags |= CLUSTER_NODE_MYSELF;
192             } else if (!strcasecmp(s,"master")) {
193                 n->flags |= CLUSTER_NODE_MASTER;
194             } else if (!strcasecmp(s,"slave")) {
195                 n->flags |= CLUSTER_NODE_SLAVE;
196             } else if (!strcasecmp(s,"fail?")) {
197                 n->flags |= CLUSTER_NODE_PFAIL;
198             } else if (!strcasecmp(s,"fail")) {
199                 n->flags |= CLUSTER_NODE_FAIL;
200                 n->fail_time = mstime();
201             } else if (!strcasecmp(s,"handshake")) {
202                 n->flags |= CLUSTER_NODE_HANDSHAKE;
203             } else if (!strcasecmp(s,"noaddr")) {
204                 n->flags |= CLUSTER_NODE_NOADDR;
205             } else if (!strcasecmp(s,"nofailover")) {
206                 n->flags |= CLUSTER_NODE_NOFAILOVER;
207             } else if (!strcasecmp(s,"noflags")) {
208                 /* nothing to do */
209             } else {
210                 serverPanic("Unknown flag in redis cluster config file");
211             }
212             if (p) s = p+1;
213         }
214 
215         /* Get master if any. Set the master and populate master's
216          * slave list. */
217         if (argv[3][0] != '-') {
218             master = clusterLookupNode(argv[3]);
219             if (!master) {
220                 master = createClusterNode(argv[3],0);
221                 clusterAddNode(master);
222             }
223             n->slaveof = master;
224             clusterNodeAddSlave(master,n);
225         }
226 
227         /* Set ping sent / pong received timestamps */
228         if (atoi(argv[4])) n->ping_sent = mstime();
229         if (atoi(argv[5])) n->pong_received = mstime();
230 
231         /* Set configEpoch for this node. */
232         n->configEpoch = strtoull(argv[6],NULL,10);
233 
234         /* Populate hash slots served by this instance. */
235         for (j = 8; j < argc; j++) {
236             int start, stop;
237 
238             if (argv[j][0] == '[') {
239                 /* Here we handle migrating / importing slots */
240                 int slot;
241                 char direction;
242                 clusterNode *cn;
243 
244                 p = strchr(argv[j],'-');
245                 serverAssert(p != NULL);
246                 *p = '\0';
247                 direction = p[1]; /* Either '>' or '<' */
248                 slot = atoi(argv[j]+1);
249                 if (slot < 0 || slot >= CLUSTER_SLOTS) goto fmterr;
250                 p += 3;
251                 cn = clusterLookupNode(p);
252                 if (!cn) {
253                     cn = createClusterNode(p,0);
254                     clusterAddNode(cn);
255                 }
256                 if (direction == '>') {
257                     server.cluster->migrating_slots_to[slot] = cn;
258                 } else {
259                     server.cluster->importing_slots_from[slot] = cn;
260                 }
261                 continue;
262             } else if ((p = strchr(argv[j],'-')) != NULL) {
263                 *p = '\0';
264                 start = atoi(argv[j]);
265                 stop = atoi(p+1);
266             } else {
267                 start = stop = atoi(argv[j]);
268             }
269             if (start < 0 || start >= CLUSTER_SLOTS) goto fmterr;
270             if (stop < 0 || stop >= CLUSTER_SLOTS) goto fmterr;
271             while(start <= stop) clusterAddSlot(n, start++);
272         }
273 
274         sdsfreesplitres(argv,argc);
275     }
276     /* Config sanity check */
277     if (server.cluster->myself == NULL) goto fmterr;
278 
279     zfree(line);
280     fclose(fp);
281 
282     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
283 
284     /* Something that should never happen: currentEpoch smaller than
285      * the max epoch found in the nodes configuration. However we handle this
286      * as some form of protection against manual editing of critical files. */
287     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
288         server.cluster->currentEpoch = clusterGetMaxEpoch();
289     }
290     return C_OK;
291 
292 fmterr:
293     serverLog(LL_WARNING,
294         "Unrecoverable error: corrupted cluster config file.");
295     zfree(line);
296     if (fp) fclose(fp);
297     exit(1);
298 }
299 
300 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
301  *
302  * This function writes the node config and returns 0, on error -1
303  * is returned.
304  *
305  * Note: we need to write the file in an atomic way from the point of view
306  * of the POSIX filesystem semantics, so that if the server is stopped
307  * or crashes during the write, we'll end with either the old file or the
308  * new one. Since we have the full payload to write available we can use
309  * a single write to write the whole file. If the pre-existing file was
310  * bigger we pad our payload with newlines that are anyway ignored and truncate
311  * the file afterward. */
clusterSaveConfig(int do_fsync)312 int clusterSaveConfig(int do_fsync) {
313     sds ci;
314     size_t content_size;
315     struct stat sb;
316     int fd;
317 
318     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
319 
320     /* Get the nodes description and concatenate our "vars" directive to
321      * save currentEpoch and lastVoteEpoch. */
322     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
323     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
324         (unsigned long long) server.cluster->currentEpoch,
325         (unsigned long long) server.cluster->lastVoteEpoch);
326     content_size = sdslen(ci);
327 
328     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
329         == -1) goto err;
330 
331     /* Pad the new payload if the existing file length is greater. */
332     if (fstat(fd,&sb) != -1) {
333         if (sb.st_size > (off_t)content_size) {
334             ci = sdsgrowzero(ci,sb.st_size);
335             memset(ci+content_size,'\n',sb.st_size-content_size);
336         }
337     }
338     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
339     if (do_fsync) {
340         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
341         fsync(fd);
342     }
343 
344     /* Truncate the file if needed to remove the final \n padding that
345      * is just garbage. */
346     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
347         /* ftruncate() failing is not a critical error. */
348     }
349     close(fd);
350     sdsfree(ci);
351     return 0;
352 
353 err:
354     if (fd != -1) close(fd);
355     sdsfree(ci);
356     return -1;
357 }
358 
clusterSaveConfigOrDie(int do_fsync)359 void clusterSaveConfigOrDie(int do_fsync) {
360     if (clusterSaveConfig(do_fsync) == -1) {
361         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
362         exit(1);
363     }
364 }
365 
366 /* Lock the cluster config using flock(), and leaks the file descritor used to
367  * acquire the lock so that the file will be locked forever.
368  *
369  * This works because we always update nodes.conf with a new version
370  * in-place, reopening the file, and writing to it in place (later adjusting
371  * the length with ftruncate()).
372  *
373  * On success C_OK is returned, otherwise an error is logged and
374  * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)375 int clusterLockConfig(char *filename) {
376 /* flock() does not exist on Solaris
377  * and a fcntl-based solution won't help, as we constantly re-open that file,
378  * which will release _all_ locks anyway
379  */
380 #if !defined(__sun)
381     /* To lock it, we need to open the file in a way it is created if
382      * it does not exist, otherwise there is a race condition with other
383      * processes. */
384     int fd = open(filename,O_WRONLY|O_CREAT,0644);
385     if (fd == -1) {
386         serverLog(LL_WARNING,
387             "Can't open %s in order to acquire a lock: %s",
388             filename, strerror(errno));
389         return C_ERR;
390     }
391 
392     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
393         if (errno == EWOULDBLOCK) {
394             serverLog(LL_WARNING,
395                  "Sorry, the cluster configuration file %s is already used "
396                  "by a different Redis Cluster node. Please make sure that "
397                  "different nodes use different cluster configuration "
398                  "files.", filename);
399         } else {
400             serverLog(LL_WARNING,
401                 "Impossible to lock %s: %s", filename, strerror(errno));
402         }
403         close(fd);
404         return C_ERR;
405     }
406     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
407      * lock to the file as long as the process exists. */
408 #endif /* __sun */
409 
410     return C_OK;
411 }
412 
413 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
414  * in the "myself" node based on the current configuration of the node,
415  * that may change at runtime via CONFIG SET. This function changes the
416  * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)417 void clusterUpdateMyselfFlags(void) {
418     int oldflags = myself->flags;
419     int nofailover = server.cluster_slave_no_failover ?
420                      CLUSTER_NODE_NOFAILOVER : 0;
421     myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
422     myself->flags |= nofailover;
423     if (myself->flags != oldflags) {
424         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
425                              CLUSTER_TODO_UPDATE_STATE);
426     }
427 }
428 
clusterInit(void)429 void clusterInit(void) {
430     int saveconf = 0;
431 
432     server.cluster = zmalloc(sizeof(clusterState));
433     server.cluster->myself = NULL;
434     server.cluster->currentEpoch = 0;
435     server.cluster->state = CLUSTER_FAIL;
436     server.cluster->size = 1;
437     server.cluster->todo_before_sleep = 0;
438     server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
439     server.cluster->nodes_black_list =
440         dictCreate(&clusterNodesBlackListDictType,NULL);
441     server.cluster->failover_auth_time = 0;
442     server.cluster->failover_auth_count = 0;
443     server.cluster->failover_auth_rank = 0;
444     server.cluster->failover_auth_epoch = 0;
445     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
446     server.cluster->lastVoteEpoch = 0;
447     for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
448         server.cluster->stats_bus_messages_sent[i] = 0;
449         server.cluster->stats_bus_messages_received[i] = 0;
450     }
451     server.cluster->stats_pfail_nodes = 0;
452     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
453     clusterCloseAllSlots();
454 
455     /* Lock the cluster config file to make sure every node uses
456      * its own nodes.conf. */
457     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
458         exit(1);
459 
460     /* Load or create a new nodes configuration. */
461     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
462         /* No configuration found. We will just use the random name provided
463          * by the createClusterNode() function. */
464         myself = server.cluster->myself =
465             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
466         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
467             myself->name);
468         clusterAddNode(myself);
469         saveconf = 1;
470     }
471     if (saveconf) clusterSaveConfigOrDie(1);
472 
473     /* We need a listening TCP port for our cluster messaging needs. */
474     server.cfd_count = 0;
475 
476     /* Port sanity check II
477      * The other handshake port check is triggered too late to stop
478      * us from trying to use a too-high cluster port number. */
479     if (server.port > (65535-CLUSTER_PORT_INCR)) {
480         serverLog(LL_WARNING, "Redis port number too high. "
481                    "Cluster communication port is 10,000 port "
482                    "numbers higher than your Redis port. "
483                    "Your Redis port number must be "
484                    "lower than 55535.");
485         exit(1);
486     }
487 
488     if (listenToPort(server.port+CLUSTER_PORT_INCR,
489         server.cfd,&server.cfd_count) == C_ERR)
490     {
491         exit(1);
492     } else {
493         int j;
494 
495         for (j = 0; j < server.cfd_count; j++) {
496             if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
497                 clusterAcceptHandler, NULL) == AE_ERR)
498                     serverPanic("Unrecoverable error creating Redis Cluster "
499                                 "file event.");
500         }
501     }
502 
503     /* The slots -> keys map is a radix tree. Initialize it here. */
504     server.cluster->slots_to_keys = raxNew();
505     memset(server.cluster->slots_keys_count,0,
506            sizeof(server.cluster->slots_keys_count));
507 
508     /* Set myself->port / cport to my listening ports, we'll just need to
509      * discover the IP address via MEET messages. */
510     myself->port = server.port;
511     myself->cport = server.port+CLUSTER_PORT_INCR;
512     if (server.cluster_announce_port)
513         myself->port = server.cluster_announce_port;
514     if (server.cluster_announce_bus_port)
515         myself->cport = server.cluster_announce_bus_port;
516 
517     server.cluster->mf_end = 0;
518     resetManualFailover();
519     clusterUpdateMyselfFlags();
520 }
521 
522 /* Reset a node performing a soft or hard reset:
523  *
524  * 1) All other nodes are forget.
525  * 2) All the assigned / open slots are released.
526  * 3) If the node is a slave, it turns into a master.
527  * 5) Only for hard reset: a new Node ID is generated.
528  * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
529  * 7) The new configuration is saved and the cluster state updated.
530  * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)531 void clusterReset(int hard) {
532     dictIterator *di;
533     dictEntry *de;
534     int j;
535 
536     /* Turn into master. */
537     if (nodeIsSlave(myself)) {
538         clusterSetNodeAsMaster(myself);
539         replicationUnsetMaster();
540         emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
541     }
542 
543     /* Close slots, reset manual failover state. */
544     clusterCloseAllSlots();
545     resetManualFailover();
546 
547     /* Unassign all the slots. */
548     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
549 
550     /* Forget all the nodes, but myself. */
551     di = dictGetSafeIterator(server.cluster->nodes);
552     while((de = dictNext(di)) != NULL) {
553         clusterNode *node = dictGetVal(de);
554 
555         if (node == myself) continue;
556         clusterDelNode(node);
557     }
558     dictReleaseIterator(di);
559 
560     /* Hard reset only: set epochs to 0, change node ID. */
561     if (hard) {
562         sds oldname;
563 
564         server.cluster->currentEpoch = 0;
565         server.cluster->lastVoteEpoch = 0;
566         myself->configEpoch = 0;
567         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
568 
569         /* To change the Node ID we need to remove the old name from the
570          * nodes table, change the ID, and re-add back with new name. */
571         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
572         dictDelete(server.cluster->nodes,oldname);
573         sdsfree(oldname);
574         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
575         clusterAddNode(myself);
576         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
577     }
578 
579     /* Make sure to persist the new config and update the state. */
580     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
581                          CLUSTER_TODO_UPDATE_STATE|
582                          CLUSTER_TODO_FSYNC_CONFIG);
583 }
584 
585 /* -----------------------------------------------------------------------------
586  * CLUSTER communication link
587  * -------------------------------------------------------------------------- */
588 
createClusterLink(clusterNode * node)589 clusterLink *createClusterLink(clusterNode *node) {
590     clusterLink *link = zmalloc(sizeof(*link));
591     link->ctime = mstime();
592     link->sndbuf = sdsempty();
593     link->rcvbuf = sdsempty();
594     link->node = node;
595     link->fd = -1;
596     return link;
597 }
598 
599 /* Free a cluster link, but does not free the associated node of course.
600  * This function will just make sure that the original node associated
601  * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)602 void freeClusterLink(clusterLink *link) {
603     if (link->fd != -1) {
604         aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
605     }
606     sdsfree(link->sndbuf);
607     sdsfree(link->rcvbuf);
608     if (link->node)
609         link->node->link = NULL;
610     close(link->fd);
611     zfree(link);
612 }
613 
614 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)615 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
616     int cport, cfd;
617     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
618     char cip[NET_IP_STR_LEN];
619     clusterLink *link;
620     UNUSED(el);
621     UNUSED(mask);
622     UNUSED(privdata);
623 
624     /* If the server is starting up, don't accept cluster connections:
625      * UPDATE messages may interact with the database content. */
626     if (server.masterhost == NULL && server.loading) return;
627 
628     while(max--) {
629         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
630         if (cfd == ANET_ERR) {
631             if (errno != EWOULDBLOCK)
632                 serverLog(LL_VERBOSE,
633                     "Error accepting cluster node: %s", server.neterr);
634             return;
635         }
636         anetNonBlock(NULL,cfd);
637         anetEnableTcpNoDelay(NULL,cfd);
638 
639         /* Use non-blocking I/O for cluster messages. */
640         serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
641         /* Create a link object we use to handle the connection.
642          * It gets passed to the readable handler when data is available.
643          * Initiallly the link->node pointer is set to NULL as we don't know
644          * which node is, but the right node is references once we know the
645          * node identity. */
646         link = createClusterLink(NULL);
647         link->fd = cfd;
648         aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
649     }
650 }
651 
652 /* -----------------------------------------------------------------------------
653  * Key space handling
654  * -------------------------------------------------------------------------- */
655 
656 /* We have 16384 hash slots. The hash slot of a given key is obtained
657  * as the least significant 14 bits of the crc16 of the key.
658  *
659  * However if the key contains the {...} pattern, only the part between
660  * { and } is hashed. This may be useful in the future to force certain
661  * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)662 unsigned int keyHashSlot(char *key, int keylen) {
663     int s, e; /* start-end indexes of { and } */
664 
665     for (s = 0; s < keylen; s++)
666         if (key[s] == '{') break;
667 
668     /* No '{' ? Hash the whole key. This is the base case. */
669     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
670 
671     /* '{' found? Check if we have the corresponding '}'. */
672     for (e = s+1; e < keylen; e++)
673         if (key[e] == '}') break;
674 
675     /* No '}' or nothing between {} ? Hash the whole key. */
676     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
677 
678     /* If we are here there is both a { and a } on its right. Hash
679      * what is in the middle between { and }. */
680     return crc16(key+s+1,e-s-1) & 0x3FFF;
681 }
682 
683 /* -----------------------------------------------------------------------------
684  * CLUSTER node API
685  * -------------------------------------------------------------------------- */
686 
687 /* Create a new cluster node, with the specified flags.
688  * If "nodename" is NULL this is considered a first handshake and a random
689  * node name is assigned to this node (it will be fixed later when we'll
690  * receive the first pong).
691  *
692  * The node is created and returned to the user, but it is not automatically
693  * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)694 clusterNode *createClusterNode(char *nodename, int flags) {
695     clusterNode *node = zmalloc(sizeof(*node));
696 
697     if (nodename)
698         memcpy(node->name, nodename, CLUSTER_NAMELEN);
699     else
700         getRandomHexChars(node->name, CLUSTER_NAMELEN);
701     node->ctime = mstime();
702     node->configEpoch = 0;
703     node->flags = flags;
704     memset(node->slots,0,sizeof(node->slots));
705     node->numslots = 0;
706     node->numslaves = 0;
707     node->slaves = NULL;
708     node->slaveof = NULL;
709     node->ping_sent = node->pong_received = 0;
710     node->fail_time = 0;
711     node->link = NULL;
712     memset(node->ip,0,sizeof(node->ip));
713     node->port = 0;
714     node->cport = 0;
715     node->fail_reports = listCreate();
716     node->voted_time = 0;
717     node->orphaned_time = 0;
718     node->repl_offset_time = 0;
719     node->repl_offset = 0;
720     listSetFreeMethod(node->fail_reports,zfree);
721     return node;
722 }
723 
724 /* This function is called every time we get a failure report from a node.
725  * The side effect is to populate the fail_reports list (or to update
726  * the timestamp of an existing report).
727  *
728  * 'failing' is the node that is in failure state according to the
729  * 'sender' node.
730  *
731  * The function returns 0 if it just updates a timestamp of an existing
732  * failure report from the same sender. 1 is returned if a new failure
733  * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)734 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
735     list *l = failing->fail_reports;
736     listNode *ln;
737     listIter li;
738     clusterNodeFailReport *fr;
739 
740     /* If a failure report from the same sender already exists, just update
741      * the timestamp. */
742     listRewind(l,&li);
743     while ((ln = listNext(&li)) != NULL) {
744         fr = ln->value;
745         if (fr->node == sender) {
746             fr->time = mstime();
747             return 0;
748         }
749     }
750 
751     /* Otherwise create a new report. */
752     fr = zmalloc(sizeof(*fr));
753     fr->node = sender;
754     fr->time = mstime();
755     listAddNodeTail(l,fr);
756     return 1;
757 }
758 
759 /* Remove failure reports that are too old, where too old means reasonably
760  * older than the global node timeout. Note that anyway for a node to be
761  * flagged as FAIL we need to have a local PFAIL state that is at least
762  * older than the global node timeout, so we don't just trust the number
763  * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)764 void clusterNodeCleanupFailureReports(clusterNode *node) {
765     list *l = node->fail_reports;
766     listNode *ln;
767     listIter li;
768     clusterNodeFailReport *fr;
769     mstime_t maxtime = server.cluster_node_timeout *
770                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
771     mstime_t now = mstime();
772 
773     listRewind(l,&li);
774     while ((ln = listNext(&li)) != NULL) {
775         fr = ln->value;
776         if (now - fr->time > maxtime) listDelNode(l,ln);
777     }
778 }
779 
780 /* Remove the failing report for 'node' if it was previously considered
781  * failing by 'sender'. This function is called when a node informs us via
782  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
783  *
784  * Note that this function is called relatively often as it gets called even
785  * when there are no nodes failing, and is O(N), however when the cluster is
786  * fine the failure reports list is empty so the function runs in constant
787  * time.
788  *
789  * The function returns 1 if the failure report was found and removed.
790  * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)791 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
792     list *l = node->fail_reports;
793     listNode *ln;
794     listIter li;
795     clusterNodeFailReport *fr;
796 
797     /* Search for a failure report from this sender. */
798     listRewind(l,&li);
799     while ((ln = listNext(&li)) != NULL) {
800         fr = ln->value;
801         if (fr->node == sender) break;
802     }
803     if (!ln) return 0; /* No failure report from this sender. */
804 
805     /* Remove the failure report. */
806     listDelNode(l,ln);
807     clusterNodeCleanupFailureReports(node);
808     return 1;
809 }
810 
811 /* Return the number of external nodes that believe 'node' is failing,
812  * not including this node, that may have a PFAIL or FAIL state for this
813  * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)814 int clusterNodeFailureReportsCount(clusterNode *node) {
815     clusterNodeCleanupFailureReports(node);
816     return listLength(node->fail_reports);
817 }
818 
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)819 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
820     int j;
821 
822     for (j = 0; j < master->numslaves; j++) {
823         if (master->slaves[j] == slave) {
824             if ((j+1) < master->numslaves) {
825                 int remaining_slaves = (master->numslaves - j) - 1;
826                 memmove(master->slaves+j,master->slaves+(j+1),
827                         (sizeof(*master->slaves) * remaining_slaves));
828             }
829             master->numslaves--;
830             if (master->numslaves == 0)
831                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
832             return C_OK;
833         }
834     }
835     return C_ERR;
836 }
837 
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)838 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
839     int j;
840 
841     /* If it's already a slave, don't add it again. */
842     for (j = 0; j < master->numslaves; j++)
843         if (master->slaves[j] == slave) return C_ERR;
844     master->slaves = zrealloc(master->slaves,
845         sizeof(clusterNode*)*(master->numslaves+1));
846     master->slaves[master->numslaves] = slave;
847     master->numslaves++;
848     master->flags |= CLUSTER_NODE_MIGRATE_TO;
849     return C_OK;
850 }
851 
clusterCountNonFailingSlaves(clusterNode * n)852 int clusterCountNonFailingSlaves(clusterNode *n) {
853     int j, okslaves = 0;
854 
855     for (j = 0; j < n->numslaves; j++)
856         if (!nodeFailed(n->slaves[j])) okslaves++;
857     return okslaves;
858 }
859 
860 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)861 void freeClusterNode(clusterNode *n) {
862     sds nodename;
863     int j;
864 
865     /* If the node has associated slaves, we have to set
866      * all the slaves->slaveof fields to NULL (unknown). */
867     for (j = 0; j < n->numslaves; j++)
868         n->slaves[j]->slaveof = NULL;
869 
870     /* Remove this node from the list of slaves of its master. */
871     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
872 
873     /* Unlink from the set of nodes. */
874     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
875     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
876     sdsfree(nodename);
877 
878     /* Release link and associated data structures. */
879     if (n->link) freeClusterLink(n->link);
880     listRelease(n->fail_reports);
881     zfree(n->slaves);
882     zfree(n);
883 }
884 
885 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)886 int clusterAddNode(clusterNode *node) {
887     int retval;
888 
889     retval = dictAdd(server.cluster->nodes,
890             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
891     return (retval == DICT_OK) ? C_OK : C_ERR;
892 }
893 
894 /* Remove a node from the cluster. The functio performs the high level
895  * cleanup, calling freeClusterNode() for the low level cleanup.
896  * Here we do the following:
897  *
898  * 1) Mark all the slots handled by it as unassigned.
899  * 2) Remove all the failure reports sent by this node and referenced by
900  *    other nodes.
901  * 3) Free the node with freeClusterNode() that will in turn remove it
902  *    from the hash table and from the list of slaves of its master, if
903  *    it is a slave node.
904  */
clusterDelNode(clusterNode * delnode)905 void clusterDelNode(clusterNode *delnode) {
906     int j;
907     dictIterator *di;
908     dictEntry *de;
909 
910     /* 1) Mark slots as unassigned. */
911     for (j = 0; j < CLUSTER_SLOTS; j++) {
912         if (server.cluster->importing_slots_from[j] == delnode)
913             server.cluster->importing_slots_from[j] = NULL;
914         if (server.cluster->migrating_slots_to[j] == delnode)
915             server.cluster->migrating_slots_to[j] = NULL;
916         if (server.cluster->slots[j] == delnode)
917             clusterDelSlot(j);
918     }
919 
920     /* 2) Remove failure reports. */
921     di = dictGetSafeIterator(server.cluster->nodes);
922     while((de = dictNext(di)) != NULL) {
923         clusterNode *node = dictGetVal(de);
924 
925         if (node == delnode) continue;
926         clusterNodeDelFailureReport(node,delnode);
927     }
928     dictReleaseIterator(di);
929 
930     /* 3) Free the node, unlinking it from the cluster. */
931     freeClusterNode(delnode);
932 }
933 
934 /* Node lookup by name */
clusterLookupNode(const char * name)935 clusterNode *clusterLookupNode(const char *name) {
936     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
937     dictEntry *de;
938 
939     de = dictFind(server.cluster->nodes,s);
940     sdsfree(s);
941     if (de == NULL) return NULL;
942     return dictGetVal(de);
943 }
944 
945 /* This is only used after the handshake. When we connect a given IP/PORT
946  * as a result of CLUSTER MEET we don't have the node name yet, so we
947  * pick a random one, and will fix it when we receive the PONG request using
948  * this function. */
clusterRenameNode(clusterNode * node,char * newname)949 void clusterRenameNode(clusterNode *node, char *newname) {
950     int retval;
951     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
952 
953     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
954         node->name, newname);
955     retval = dictDelete(server.cluster->nodes, s);
956     sdsfree(s);
957     serverAssert(retval == DICT_OK);
958     memcpy(node->name, newname, CLUSTER_NAMELEN);
959     clusterAddNode(node);
960 }
961 
962 /* -----------------------------------------------------------------------------
963  * CLUSTER config epoch handling
964  * -------------------------------------------------------------------------- */
965 
966 /* Return the greatest configEpoch found in the cluster, or the current
967  * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)968 uint64_t clusterGetMaxEpoch(void) {
969     uint64_t max = 0;
970     dictIterator *di;
971     dictEntry *de;
972 
973     di = dictGetSafeIterator(server.cluster->nodes);
974     while((de = dictNext(di)) != NULL) {
975         clusterNode *node = dictGetVal(de);
976         if (node->configEpoch > max) max = node->configEpoch;
977     }
978     dictReleaseIterator(di);
979     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
980     return max;
981 }
982 
983 /* If this node epoch is zero or is not already the greatest across the
984  * cluster (from the POV of the local configuration), this function will:
985  *
986  * 1) Generate a new config epoch, incrementing the current epoch.
987  * 2) Assign the new epoch to this node, WITHOUT any consensus.
988  * 3) Persist the configuration on disk before sending packets with the
989  *    new configuration.
990  *
991  * If the new config epoch is generated and assigend, C_OK is returned,
992  * otherwise C_ERR is returned (since the node has already the greatest
993  * configuration around) and no operation is performed.
994  *
995  * Important note: this function violates the principle that config epochs
996  * should be generated with consensus and should be unique across the cluster.
997  * However Redis Cluster uses this auto-generated new config epochs in two
998  * cases:
999  *
1000  * 1) When slots are closed after importing. Otherwise resharding would be
1001  *    too expensive.
1002  * 2) When CLUSTER FAILOVER is called with options that force a slave to
1003  *    failover its master even if there is not master majority able to
1004  *    create a new configuration epoch.
1005  *
1006  * Redis Cluster will not explode using this function, even in the case of
1007  * a collision between this node and another node, generating the same
1008  * configuration epoch unilaterally, because the config epoch conflict
1009  * resolution algorithm will eventually move colliding nodes to different
1010  * config epochs. However using this function may violate the "last failover
1011  * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1012 int clusterBumpConfigEpochWithoutConsensus(void) {
1013     uint64_t maxEpoch = clusterGetMaxEpoch();
1014 
1015     if (myself->configEpoch == 0 ||
1016         myself->configEpoch != maxEpoch)
1017     {
1018         server.cluster->currentEpoch++;
1019         myself->configEpoch = server.cluster->currentEpoch;
1020         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1021                              CLUSTER_TODO_FSYNC_CONFIG);
1022         serverLog(LL_WARNING,
1023             "New configEpoch set to %llu",
1024             (unsigned long long) myself->configEpoch);
1025         return C_OK;
1026     } else {
1027         return C_ERR;
1028     }
1029 }
1030 
1031 /* This function is called when this node is a master, and we receive from
1032  * another master a configuration epoch that is equal to our configuration
1033  * epoch.
1034  *
1035  * BACKGROUND
1036  *
1037  * It is not possible that different slaves get the same config
1038  * epoch during a failover election, because the slaves need to get voted
1039  * by a majority. However when we perform a manual resharding of the cluster
1040  * the node will assign a configuration epoch to itself without to ask
1041  * for agreement. Usually resharding happens when the cluster is working well
1042  * and is supervised by the sysadmin, however it is possible for a failover
1043  * to happen exactly while the node we are resharding a slot to assigns itself
1044  * a new configuration epoch, but before it is able to propagate it.
1045  *
1046  * So technically it is possible in this condition that two nodes end with
1047  * the same configuration epoch.
1048  *
1049  * Another possibility is that there are bugs in the implementation causing
1050  * this to happen.
1051  *
1052  * Moreover when a new cluster is created, all the nodes start with the same
1053  * configEpoch. This collision resolution code allows nodes to automatically
1054  * end with a different configEpoch at startup automatically.
1055  *
1056  * In all the cases, we want a mechanism that resolves this issue automatically
1057  * as a safeguard. The same configuration epoch for masters serving different
1058  * set of slots is not harmful, but it is if the nodes end serving the same
1059  * slots for some reason (manual errors or software bugs) without a proper
1060  * failover procedure.
1061  *
1062  * In general we want a system that eventually always ends with different
1063  * masters having different configuration epochs whatever happened, since
1064  * nothign is worse than a split-brain condition in a distributed system.
1065  *
1066  * BEHAVIOR
1067  *
1068  * When this function gets called, what happens is that if this node
1069  * has the lexicographically smaller Node ID compared to the other node
1070  * with the conflicting epoch (the 'sender' node), it will assign itself
1071  * the greatest configuration epoch currently detected among nodes plus 1.
1072  *
1073  * This means that even if there are multiple nodes colliding, the node
1074  * with the greatest Node ID never moves forward, so eventually all the nodes
1075  * end with a different configuration epoch.
1076  */
clusterHandleConfigEpochCollision(clusterNode * sender)1077 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1078     /* Prerequisites: nodes have the same configEpoch and are both masters. */
1079     if (sender->configEpoch != myself->configEpoch ||
1080         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1081     /* Don't act if the colliding node has a smaller Node ID. */
1082     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1083     /* Get the next ID available at the best of this node knowledge. */
1084     server.cluster->currentEpoch++;
1085     myself->configEpoch = server.cluster->currentEpoch;
1086     clusterSaveConfigOrDie(1);
1087     serverLog(LL_VERBOSE,
1088         "WARNING: configEpoch collision with node %.40s."
1089         " configEpoch set to %llu",
1090         sender->name,
1091         (unsigned long long) myself->configEpoch);
1092 }
1093 
1094 /* -----------------------------------------------------------------------------
1095  * CLUSTER nodes blacklist
1096  *
1097  * The nodes blacklist is just a way to ensure that a given node with a given
1098  * Node ID is not readded before some time elapsed (this time is specified
1099  * in seconds in CLUSTER_BLACKLIST_TTL).
1100  *
1101  * This is useful when we want to remove a node from the cluster completely:
1102  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1103  * that even if we receive gossip messages from other nodes that still remember
1104  * about the node we want to remove, we don't re-add it before some time.
1105  *
1106  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1107  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1108  * in the cluster without dealing with the problem of other nodes re-adding
1109  * back the node to nodes we already sent the FORGET command to.
1110  *
1111  * The data structure used is a hash table with an sds string representing
1112  * the node ID as key, and the time when it is ok to re-add the node as
1113  * value.
1114  * -------------------------------------------------------------------------- */
1115 
1116 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
1117 
1118 
1119 /* Before of the addNode() or Exists() operations we always remove expired
1120  * entries from the black list. This is an O(N) operation but it is not a
1121  * problem since add / exists operations are called very infrequently and
1122  * the hash table is supposed to contain very little elements at max.
1123  * However without the cleanup during long uptimes and with some automated
1124  * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1125 void clusterBlacklistCleanup(void) {
1126     dictIterator *di;
1127     dictEntry *de;
1128 
1129     di = dictGetSafeIterator(server.cluster->nodes_black_list);
1130     while((de = dictNext(di)) != NULL) {
1131         int64_t expire = dictGetUnsignedIntegerVal(de);
1132 
1133         if (expire < server.unixtime)
1134             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1135     }
1136     dictReleaseIterator(di);
1137 }
1138 
1139 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1140 void clusterBlacklistAddNode(clusterNode *node) {
1141     dictEntry *de;
1142     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1143 
1144     clusterBlacklistCleanup();
1145     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1146         /* If the key was added, duplicate the sds string representation of
1147          * the key for the next lookup. We'll free it at the end. */
1148         id = sdsdup(id);
1149     }
1150     de = dictFind(server.cluster->nodes_black_list,id);
1151     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1152     sdsfree(id);
1153 }
1154 
1155 /* Return non-zero if the specified node ID exists in the blacklist.
1156  * You don't need to pass an sds string here, any pointer to 40 bytes
1157  * will work. */
clusterBlacklistExists(char * nodeid)1158 int clusterBlacklistExists(char *nodeid) {
1159     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1160     int retval;
1161 
1162     clusterBlacklistCleanup();
1163     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1164     sdsfree(id);
1165     return retval;
1166 }
1167 
1168 /* -----------------------------------------------------------------------------
1169  * CLUSTER messages exchange - PING/PONG and gossip
1170  * -------------------------------------------------------------------------- */
1171 
1172 /* This function checks if a given node should be marked as FAIL.
1173  * It happens if the following conditions are met:
1174  *
1175  * 1) We received enough failure reports from other master nodes via gossip.
1176  *    Enough means that the majority of the masters signaled the node is
1177  *    down recently.
1178  * 2) We believe this node is in PFAIL state.
1179  *
1180  * If a failure is detected we also inform the whole cluster about this
1181  * event trying to force every other node to set the FAIL flag for the node.
1182  *
1183  * Note that the form of agreement used here is weak, as we collect the majority
1184  * of masters state during some time, and even if we force agreement by
1185  * propagating the FAIL message, because of partitions we may not reach every
1186  * node. However:
1187  *
1188  * 1) Either we reach the majority and eventually the FAIL state will propagate
1189  *    to all the cluster.
1190  * 2) Or there is no majority so no slave promotion will be authorized and the
1191  *    FAIL flag will be cleared after some time.
1192  */
markNodeAsFailingIfNeeded(clusterNode * node)1193 void markNodeAsFailingIfNeeded(clusterNode *node) {
1194     int failures;
1195     int needed_quorum = (server.cluster->size / 2) + 1;
1196 
1197     if (!nodeTimedOut(node)) return; /* We can reach it. */
1198     if (nodeFailed(node)) return; /* Already FAILing. */
1199 
1200     failures = clusterNodeFailureReportsCount(node);
1201     /* Also count myself as a voter if I'm a master. */
1202     if (nodeIsMaster(myself)) failures++;
1203     if (failures < needed_quorum) return; /* No weak agreement from masters. */
1204 
1205     serverLog(LL_NOTICE,
1206         "Marking node %.40s as failing (quorum reached).", node->name);
1207 
1208     /* Mark the node as failing. */
1209     node->flags &= ~CLUSTER_NODE_PFAIL;
1210     node->flags |= CLUSTER_NODE_FAIL;
1211     node->fail_time = mstime();
1212 
1213     /* Broadcast the failing node name to everybody, forcing all the other
1214      * reachable nodes to flag the node as FAIL. */
1215     if (nodeIsMaster(myself)) clusterSendFail(node->name);
1216     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1217 }
1218 
1219 /* This function is called only if a node is marked as FAIL, but we are able
1220  * to reach it again. It checks if there are the conditions to undo the FAIL
1221  * state. */
clearNodeFailureIfNeeded(clusterNode * node)1222 void clearNodeFailureIfNeeded(clusterNode *node) {
1223     mstime_t now = mstime();
1224 
1225     serverAssert(nodeFailed(node));
1226 
1227     /* For slaves we always clear the FAIL flag if we can contact the
1228      * node again. */
1229     if (nodeIsSlave(node) || node->numslots == 0) {
1230         serverLog(LL_NOTICE,
1231             "Clear FAIL state for node %.40s: %s is reachable again.",
1232                 node->name,
1233                 nodeIsSlave(node) ? "replica" : "master without slots");
1234         node->flags &= ~CLUSTER_NODE_FAIL;
1235         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1236     }
1237 
1238     /* If it is a master and...
1239      * 1) The FAIL state is old enough.
1240      * 2) It is yet serving slots from our point of view (not failed over).
1241      * Apparently no one is going to fix these slots, clear the FAIL flag. */
1242     if (nodeIsMaster(node) && node->numslots > 0 &&
1243         (now - node->fail_time) >
1244         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1245     {
1246         serverLog(LL_NOTICE,
1247             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1248                 node->name);
1249         node->flags &= ~CLUSTER_NODE_FAIL;
1250         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1251     }
1252 }
1253 
1254 /* Return true if we already have a node in HANDSHAKE state matching the
1255  * specified ip address and port number. This function is used in order to
1256  * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1257 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1258     dictIterator *di;
1259     dictEntry *de;
1260 
1261     di = dictGetSafeIterator(server.cluster->nodes);
1262     while((de = dictNext(di)) != NULL) {
1263         clusterNode *node = dictGetVal(de);
1264 
1265         if (!nodeInHandshake(node)) continue;
1266         if (!strcasecmp(node->ip,ip) &&
1267             node->port == port &&
1268             node->cport == cport) break;
1269     }
1270     dictReleaseIterator(di);
1271     return de != NULL;
1272 }
1273 
1274 /* Start an handshake with the specified address if there is not one
1275  * already in progress. Returns non-zero if the handshake was actually
1276  * started. On error zero is returned and errno is set to one of the
1277  * following values:
1278  *
1279  * EAGAIN - There is already an handshake in progress for this address.
1280  * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1281 int clusterStartHandshake(char *ip, int port, int cport) {
1282     clusterNode *n;
1283     char norm_ip[NET_IP_STR_LEN];
1284     struct sockaddr_storage sa;
1285 
1286     /* IP sanity check */
1287     if (inet_pton(AF_INET,ip,
1288             &(((struct sockaddr_in *)&sa)->sin_addr)))
1289     {
1290         sa.ss_family = AF_INET;
1291     } else if (inet_pton(AF_INET6,ip,
1292             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1293     {
1294         sa.ss_family = AF_INET6;
1295     } else {
1296         errno = EINVAL;
1297         return 0;
1298     }
1299 
1300     /* Port sanity check */
1301     if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1302         errno = EINVAL;
1303         return 0;
1304     }
1305 
1306     /* Set norm_ip as the normalized string representation of the node
1307      * IP address. */
1308     memset(norm_ip,0,NET_IP_STR_LEN);
1309     if (sa.ss_family == AF_INET)
1310         inet_ntop(AF_INET,
1311             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1312             norm_ip,NET_IP_STR_LEN);
1313     else
1314         inet_ntop(AF_INET6,
1315             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1316             norm_ip,NET_IP_STR_LEN);
1317 
1318     if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1319         errno = EAGAIN;
1320         return 0;
1321     }
1322 
1323     /* Add the node with a random address (NULL as first argument to
1324      * createClusterNode()). Everything will be fixed during the
1325      * handshake. */
1326     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1327     memcpy(n->ip,norm_ip,sizeof(n->ip));
1328     n->port = port;
1329     n->cport = cport;
1330     clusterAddNode(n);
1331     return 1;
1332 }
1333 
1334 /* Process the gossip section of PING or PONG packets.
1335  * Note that this function assumes that the packet is already sanity-checked
1336  * by the caller, not in the content of the gossip section, but in the
1337  * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1338 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1339     uint16_t count = ntohs(hdr->count);
1340     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1341     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1342 
1343     while(count--) {
1344         uint16_t flags = ntohs(g->flags);
1345         clusterNode *node;
1346         sds ci;
1347 
1348         if (server.verbosity == LL_DEBUG) {
1349             ci = representClusterNodeFlags(sdsempty(), flags);
1350             serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1351                 g->nodename,
1352                 g->ip,
1353                 ntohs(g->port),
1354                 ntohs(g->cport),
1355                 ci);
1356             sdsfree(ci);
1357         }
1358 
1359         /* Update our state accordingly to the gossip sections */
1360         node = clusterLookupNode(g->nodename);
1361         if (node) {
1362             /* We already know this node.
1363                Handle failure reports, only when the sender is a master. */
1364             if (sender && nodeIsMaster(sender) && node != myself) {
1365                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1366                     if (clusterNodeAddFailureReport(node,sender)) {
1367                         serverLog(LL_VERBOSE,
1368                             "Node %.40s reported node %.40s as not reachable.",
1369                             sender->name, node->name);
1370                     }
1371                     markNodeAsFailingIfNeeded(node);
1372                 } else {
1373                     if (clusterNodeDelFailureReport(node,sender)) {
1374                         serverLog(LL_VERBOSE,
1375                             "Node %.40s reported node %.40s is back online.",
1376                             sender->name, node->name);
1377                     }
1378                 }
1379             }
1380 
1381             /* If from our POV the node is up (no failure flags are set),
1382              * we have no pending ping for the node, nor we have failure
1383              * reports for this node, update the last pong time with the
1384              * one we see from the other nodes. */
1385             if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1386                 node->ping_sent == 0 &&
1387                 clusterNodeFailureReportsCount(node) == 0)
1388             {
1389                 mstime_t pongtime = ntohl(g->pong_received);
1390                 pongtime *= 1000; /* Convert back to milliseconds. */
1391 
1392                 /* Replace the pong time with the received one only if
1393                  * it's greater than our view but is not in the future
1394                  * (with 500 milliseconds tolerance) from the POV of our
1395                  * clock. */
1396                 if (pongtime <= (server.mstime+500) &&
1397                     pongtime > node->pong_received)
1398                 {
1399                     node->pong_received = pongtime;
1400                 }
1401             }
1402 
1403             /* If we already know this node, but it is not reachable, and
1404              * we see a different address in the gossip section of a node that
1405              * can talk with this other node, update the address, disconnect
1406              * the old link if any, so that we'll attempt to connect with the
1407              * new address. */
1408             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1409                 !(flags & CLUSTER_NODE_NOADDR) &&
1410                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1411                 (strcasecmp(node->ip,g->ip) ||
1412                  node->port != ntohs(g->port) ||
1413                  node->cport != ntohs(g->cport)))
1414             {
1415                 if (node->link) freeClusterLink(node->link);
1416                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1417                 node->port = ntohs(g->port);
1418                 node->cport = ntohs(g->cport);
1419                 node->flags &= ~CLUSTER_NODE_NOADDR;
1420             }
1421         } else {
1422             /* If it's not in NOADDR state and we don't have it, we
1423              * start a handshake process against this IP/PORT pairs.
1424              *
1425              * Note that we require that the sender of this gossip message
1426              * is a well known node in our cluster, otherwise we risk
1427              * joining another cluster. */
1428             if (sender &&
1429                 !(flags & CLUSTER_NODE_NOADDR) &&
1430                 !clusterBlacklistExists(g->nodename))
1431             {
1432                 clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
1433             }
1434         }
1435 
1436         /* Next node */
1437         g++;
1438     }
1439 }
1440 
1441 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1442  * If 'announced_ip' length is non-zero, it is used instead of extracting
1443  * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1444 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1445     if (announced_ip[0] != '\0') {
1446         memcpy(buf,announced_ip,NET_IP_STR_LEN);
1447         buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1448     } else {
1449         anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1450     }
1451 }
1452 
1453 /* Update the node address to the IP address that can be extracted
1454  * from link->fd, or if hdr->myip is non empty, to the address the node
1455  * is announcing us. The port is taken from the packet header as well.
1456  *
1457  * If the address or port changed, disconnect the node link so that we'll
1458  * connect again to the new address.
1459  *
1460  * If the ip/port pair are already correct no operation is performed at
1461  * all.
1462  *
1463  * The function returns 0 if the node address is still the same,
1464  * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1465 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1466                               clusterMsg *hdr)
1467 {
1468     char ip[NET_IP_STR_LEN] = {0};
1469     int port = ntohs(hdr->port);
1470     int cport = ntohs(hdr->cport);
1471 
1472     /* We don't proceed if the link is the same as the sender link, as this
1473      * function is designed to see if the node link is consistent with the
1474      * symmetric link that is used to receive PINGs from the node.
1475      *
1476      * As a side effect this function never frees the passed 'link', so
1477      * it is safe to call during packet processing. */
1478     if (link == node->link) return 0;
1479 
1480     nodeIp2String(ip,link,hdr->myip);
1481     if (node->port == port && node->cport == cport &&
1482         strcmp(ip,node->ip) == 0) return 0;
1483 
1484     /* IP / port is different, update it. */
1485     memcpy(node->ip,ip,sizeof(ip));
1486     node->port = port;
1487     node->cport = cport;
1488     if (node->link) freeClusterLink(node->link);
1489     node->flags &= ~CLUSTER_NODE_NOADDR;
1490     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1491         node->name, node->ip, node->port);
1492 
1493     /* Check if this is our master and we have to change the
1494      * replication target as well. */
1495     if (nodeIsSlave(myself) && myself->slaveof == node)
1496         replicationSetMaster(node->ip, node->port);
1497     return 1;
1498 }
1499 
1500 /* Reconfigure the specified node 'n' as a master. This function is called when
1501  * a node that we believed to be a slave is now acting as master in order to
1502  * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1503 void clusterSetNodeAsMaster(clusterNode *n) {
1504     if (nodeIsMaster(n)) return;
1505 
1506     if (n->slaveof) {
1507         clusterNodeRemoveSlave(n->slaveof,n);
1508         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1509     }
1510     n->flags &= ~CLUSTER_NODE_SLAVE;
1511     n->flags |= CLUSTER_NODE_MASTER;
1512     n->slaveof = NULL;
1513 
1514     /* Update config and state. */
1515     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1516                          CLUSTER_TODO_UPDATE_STATE);
1517 }
1518 
1519 /* This function is called when we receive a master configuration via a
1520  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1521  * node, and the set of slots claimed under this configEpoch.
1522  *
1523  * What we do is to rebind the slots with newer configuration compared to our
1524  * local configuration, and if needed, we turn ourself into a replica of the
1525  * node (see the function comments for more info).
1526  *
1527  * The 'sender' is the node for which we received a configuration update.
1528  * Sometimes it is not actually the "Sender" of the information, like in the
1529  * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1530 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1531     int j;
1532     clusterNode *curmaster, *newmaster = NULL;
1533     /* The dirty slots list is a list of slots for which we lose the ownership
1534      * while having still keys inside. This usually happens after a failover
1535      * or after a manual cluster reconfiguration operated by the admin.
1536      *
1537      * If the update message is not able to demote a master to slave (in this
1538      * case we'll resync with the master updating the whole key space), we
1539      * need to delete all the keys in the slots we lost ownership. */
1540     uint16_t dirty_slots[CLUSTER_SLOTS];
1541     int dirty_slots_count = 0;
1542 
1543     /* Here we set curmaster to this node or the node this node
1544      * replicates to if it's a slave. In the for loop we are
1545      * interested to check if slots are taken away from curmaster. */
1546     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1547 
1548     if (sender == myself) {
1549         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1550         return;
1551     }
1552 
1553     for (j = 0; j < CLUSTER_SLOTS; j++) {
1554         if (bitmapTestBit(slots,j)) {
1555             /* The slot is already bound to the sender of this message. */
1556             if (server.cluster->slots[j] == sender) continue;
1557 
1558             /* The slot is in importing state, it should be modified only
1559              * manually via redis-trib (example: a resharding is in progress
1560              * and the migrating side slot was already closed and is advertising
1561              * a new config. We still want the slot to be closed manually). */
1562             if (server.cluster->importing_slots_from[j]) continue;
1563 
1564             /* We rebind the slot to the new node claiming it if:
1565              * 1) The slot was unassigned or the new node claims it with a
1566              *    greater configEpoch.
1567              * 2) We are not currently importing the slot. */
1568             if (server.cluster->slots[j] == NULL ||
1569                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1570             {
1571                 /* Was this slot mine, and still contains keys? Mark it as
1572                  * a dirty slot. */
1573                 if (server.cluster->slots[j] == myself &&
1574                     countKeysInSlot(j) &&
1575                     sender != myself)
1576                 {
1577                     dirty_slots[dirty_slots_count] = j;
1578                     dirty_slots_count++;
1579                 }
1580 
1581                 if (server.cluster->slots[j] == curmaster)
1582                     newmaster = sender;
1583                 clusterDelSlot(j);
1584                 clusterAddSlot(sender,j);
1585                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1586                                      CLUSTER_TODO_UPDATE_STATE|
1587                                      CLUSTER_TODO_FSYNC_CONFIG);
1588             }
1589         }
1590     }
1591 
1592     /* After updating the slots configuration, don't do any actual change
1593      * in the state of the server if a module disabled Redis Cluster
1594      * keys redirections. */
1595     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1596         return;
1597 
1598     /* If at least one slot was reassigned from a node to another node
1599      * with a greater configEpoch, it is possible that:
1600      * 1) We are a master left without slots. This means that we were
1601      *    failed over and we should turn into a replica of the new
1602      *    master.
1603      * 2) We are a slave and our master is left without slots. We need
1604      *    to replicate to the new slots owner. */
1605     if (newmaster && curmaster->numslots == 0) {
1606         serverLog(LL_WARNING,
1607             "Configuration change detected. Reconfiguring myself "
1608             "as a replica of %.40s", sender->name);
1609         clusterSetMaster(sender);
1610         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1611                              CLUSTER_TODO_UPDATE_STATE|
1612                              CLUSTER_TODO_FSYNC_CONFIG);
1613     } else if (dirty_slots_count) {
1614         /* If we are here, we received an update message which removed
1615          * ownership for certain slots we still have keys about, but still
1616          * we are serving some slots, so this master node was not demoted to
1617          * a slave.
1618          *
1619          * In order to maintain a consistent state between keys and slots
1620          * we need to remove all the keys from the slots we lost. */
1621         for (j = 0; j < dirty_slots_count; j++)
1622             delKeysInSlot(dirty_slots[j]);
1623     }
1624 }
1625 
1626 /* When this function is called, there is a packet to process starting
1627  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1628  * function should just handle the higher level stuff of processing the
1629  * packet, modifying the cluster state if needed.
1630  *
1631  * The function returns 1 if the link is still valid after the packet
1632  * was processed, otherwise 0 if the link was freed since the packet
1633  * processing lead to some inconsistency error (for instance a PONG
1634  * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1635 int clusterProcessPacket(clusterLink *link) {
1636     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1637     uint32_t totlen = ntohl(hdr->totlen);
1638     uint16_t type = ntohs(hdr->type);
1639 
1640     if (type < CLUSTERMSG_TYPE_COUNT)
1641         server.cluster->stats_bus_messages_received[type]++;
1642     serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1643         type, (unsigned long) totlen);
1644 
1645     /* Perform sanity checks */
1646     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1647     if (totlen > sdslen(link->rcvbuf)) return 1;
1648 
1649     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1650         /* Can't handle messages of different versions. */
1651         return 1;
1652     }
1653 
1654     uint16_t flags = ntohs(hdr->flags);
1655     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1656     clusterNode *sender;
1657 
1658     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1659         type == CLUSTERMSG_TYPE_MEET)
1660     {
1661         uint16_t count = ntohs(hdr->count);
1662         uint32_t explen; /* expected length of this packet */
1663 
1664         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1665         explen += (sizeof(clusterMsgDataGossip)*count);
1666         if (totlen != explen) return 1;
1667     } else if (type == CLUSTERMSG_TYPE_FAIL) {
1668         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1669 
1670         explen += sizeof(clusterMsgDataFail);
1671         if (totlen != explen) return 1;
1672     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1673         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1674 
1675         explen += sizeof(clusterMsgDataPublish) -
1676                 8 +
1677                 ntohl(hdr->data.publish.msg.channel_len) +
1678                 ntohl(hdr->data.publish.msg.message_len);
1679         if (totlen != explen) return 1;
1680     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1681                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1682                type == CLUSTERMSG_TYPE_MFSTART)
1683     {
1684         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1685 
1686         if (totlen != explen) return 1;
1687     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1688         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1689 
1690         explen += sizeof(clusterMsgDataUpdate);
1691         if (totlen != explen) return 1;
1692     } else if (type == CLUSTERMSG_TYPE_MODULE) {
1693         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1694 
1695         explen += sizeof(clusterMsgDataPublish) -
1696                 3 + ntohl(hdr->data.module.msg.len);
1697         if (totlen != explen) return 1;
1698     }
1699 
1700     /* Check if the sender is a known node. */
1701     sender = clusterLookupNode(hdr->sender);
1702     if (sender && !nodeInHandshake(sender)) {
1703         /* Update our curretEpoch if we see a newer epoch in the cluster. */
1704         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1705         senderConfigEpoch = ntohu64(hdr->configEpoch);
1706         if (senderCurrentEpoch > server.cluster->currentEpoch)
1707             server.cluster->currentEpoch = senderCurrentEpoch;
1708         /* Update the sender configEpoch if it is publishing a newer one. */
1709         if (senderConfigEpoch > sender->configEpoch) {
1710             sender->configEpoch = senderConfigEpoch;
1711             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1712                                  CLUSTER_TODO_FSYNC_CONFIG);
1713         }
1714         /* Update the replication offset info for this node. */
1715         sender->repl_offset = ntohu64(hdr->offset);
1716         sender->repl_offset_time = mstime();
1717         /* If we are a slave performing a manual failover and our master
1718          * sent its offset while already paused, populate the MF state. */
1719         if (server.cluster->mf_end &&
1720             nodeIsSlave(myself) &&
1721             myself->slaveof == sender &&
1722             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1723             server.cluster->mf_master_offset == 0)
1724         {
1725             server.cluster->mf_master_offset = sender->repl_offset;
1726             serverLog(LL_WARNING,
1727                 "Received replication offset for paused "
1728                 "master manual failover: %lld",
1729                 server.cluster->mf_master_offset);
1730         }
1731     }
1732 
1733     /* Initial processing of PING and MEET requests replying with a PONG. */
1734     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1735         serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1736 
1737         /* We use incoming MEET messages in order to set the address
1738          * for 'myself', since only other cluster nodes will send us
1739          * MEET messages on handshakes, when the cluster joins, or
1740          * later if we changed address, and those nodes will use our
1741          * official address to connect to us. So by obtaining this address
1742          * from the socket is a simple way to discover / update our own
1743          * address in the cluster without it being hardcoded in the config.
1744          *
1745          * However if we don't have an address at all, we update the address
1746          * even with a normal PING packet. If it's wrong it will be fixed
1747          * by MEET later. */
1748         if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1749             server.cluster_announce_ip == NULL)
1750         {
1751             char ip[NET_IP_STR_LEN];
1752 
1753             if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1754                 strcmp(ip,myself->ip))
1755             {
1756                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1757                 serverLog(LL_WARNING,"IP address for this node updated to %s",
1758                     myself->ip);
1759                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1760             }
1761         }
1762 
1763         /* Add this node if it is new for us and the msg type is MEET.
1764          * In this stage we don't try to add the node with the right
1765          * flags, slaveof pointer, and so forth, as this details will be
1766          * resolved when we'll receive PONGs from the node. */
1767         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1768             clusterNode *node;
1769 
1770             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1771             nodeIp2String(node->ip,link,hdr->myip);
1772             node->port = ntohs(hdr->port);
1773             node->cport = ntohs(hdr->cport);
1774             clusterAddNode(node);
1775             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1776         }
1777 
1778         /* If this is a MEET packet from an unknown node, we still process
1779          * the gossip section here since we have to trust the sender because
1780          * of the message type. */
1781         if (!sender && type == CLUSTERMSG_TYPE_MEET)
1782             clusterProcessGossipSection(hdr,link);
1783 
1784         /* Anyway reply with a PONG */
1785         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1786     }
1787 
1788     /* PING, PONG, MEET: process config information. */
1789     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1790         type == CLUSTERMSG_TYPE_MEET)
1791     {
1792         serverLog(LL_DEBUG,"%s packet received: %p",
1793             type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1794             (void*)link->node);
1795         if (link->node) {
1796             if (nodeInHandshake(link->node)) {
1797                 /* If we already have this node, try to change the
1798                  * IP/port of the node with the new one. */
1799                 if (sender) {
1800                     serverLog(LL_VERBOSE,
1801                         "Handshake: we already know node %.40s, "
1802                         "updating the address if needed.", sender->name);
1803                     if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1804                     {
1805                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1806                                              CLUSTER_TODO_UPDATE_STATE);
1807                     }
1808                     /* Free this node as we already have it. This will
1809                      * cause the link to be freed as well. */
1810                     clusterDelNode(link->node);
1811                     return 0;
1812                 }
1813 
1814                 /* First thing to do is replacing the random name with the
1815                  * right node name if this was a handshake stage. */
1816                 clusterRenameNode(link->node, hdr->sender);
1817                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1818                     link->node->name);
1819                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1820                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1821                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1822             } else if (memcmp(link->node->name,hdr->sender,
1823                         CLUSTER_NAMELEN) != 0)
1824             {
1825                 /* If the reply has a non matching node ID we
1826                  * disconnect this node and set it as not having an associated
1827                  * address. */
1828                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1829                     link->node->name,
1830                     (int)(mstime()-(link->node->ctime)),
1831                     link->node->flags);
1832                 link->node->flags |= CLUSTER_NODE_NOADDR;
1833                 link->node->ip[0] = '\0';
1834                 link->node->port = 0;
1835                 link->node->cport = 0;
1836                 freeClusterLink(link);
1837                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1838                 return 0;
1839             }
1840         }
1841 
1842         /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1843          * announced. This is a dynamic flag that we receive from the
1844          * sender, and the latest status must be trusted. We need it to
1845          * be propagated because the slave ranking used to understand the
1846          * delay of each slave in the voting process, needs to know
1847          * what are the instances really competing. */
1848         if (sender) {
1849             int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1850             sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1851             sender->flags |= nofailover;
1852         }
1853 
1854         /* Update the node address if it changed. */
1855         if (sender && type == CLUSTERMSG_TYPE_PING &&
1856             !nodeInHandshake(sender) &&
1857             nodeUpdateAddressIfNeeded(sender,link,hdr))
1858         {
1859             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1860                                  CLUSTER_TODO_UPDATE_STATE);
1861         }
1862 
1863         /* Update our info about the node */
1864         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1865             link->node->pong_received = mstime();
1866             link->node->ping_sent = 0;
1867 
1868             /* The PFAIL condition can be reversed without external
1869              * help if it is momentary (that is, if it does not
1870              * turn into a FAIL state).
1871              *
1872              * The FAIL condition is also reversible under specific
1873              * conditions detected by clearNodeFailureIfNeeded(). */
1874             if (nodeTimedOut(link->node)) {
1875                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1876                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1877                                      CLUSTER_TODO_UPDATE_STATE);
1878             } else if (nodeFailed(link->node)) {
1879                 clearNodeFailureIfNeeded(link->node);
1880             }
1881         }
1882 
1883         /* Check for role switch: slave -> master or master -> slave. */
1884         if (sender) {
1885             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1886                 sizeof(hdr->slaveof)))
1887             {
1888                 /* Node is a master. */
1889                 clusterSetNodeAsMaster(sender);
1890             } else {
1891                 /* Node is a slave. */
1892                 clusterNode *master = clusterLookupNode(hdr->slaveof);
1893 
1894                 if (nodeIsMaster(sender)) {
1895                     /* Master turned into a slave! Reconfigure the node. */
1896                     clusterDelNodeSlots(sender);
1897                     sender->flags &= ~(CLUSTER_NODE_MASTER|
1898                                        CLUSTER_NODE_MIGRATE_TO);
1899                     sender->flags |= CLUSTER_NODE_SLAVE;
1900 
1901                     /* Update config and state. */
1902                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1903                                          CLUSTER_TODO_UPDATE_STATE);
1904                 }
1905 
1906                 /* Master node changed for this slave? */
1907                 if (master && sender->slaveof != master) {
1908                     if (sender->slaveof)
1909                         clusterNodeRemoveSlave(sender->slaveof,sender);
1910                     clusterNodeAddSlave(master,sender);
1911                     sender->slaveof = master;
1912 
1913                     /* Update config. */
1914                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1915                 }
1916             }
1917         }
1918 
1919         /* Update our info about served slots.
1920          *
1921          * Note: this MUST happen after we update the master/slave state
1922          * so that CLUSTER_NODE_MASTER flag will be set. */
1923 
1924         /* Many checks are only needed if the set of served slots this
1925          * instance claims is different compared to the set of slots we have
1926          * for it. Check this ASAP to avoid other computational expansive
1927          * checks later. */
1928         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1929         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1930 
1931         if (sender) {
1932             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1933             if (sender_master) {
1934                 dirty_slots = memcmp(sender_master->slots,
1935                         hdr->myslots,sizeof(hdr->myslots)) != 0;
1936             }
1937         }
1938 
1939         /* 1) If the sender of the message is a master, and we detected that
1940          *    the set of slots it claims changed, scan the slots to see if we
1941          *    need to update our configuration. */
1942         if (sender && nodeIsMaster(sender) && dirty_slots)
1943             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1944 
1945         /* 2) We also check for the reverse condition, that is, the sender
1946          *    claims to serve slots we know are served by a master with a
1947          *    greater configEpoch. If this happens we inform the sender.
1948          *
1949          * This is useful because sometimes after a partition heals, a
1950          * reappearing master may be the last one to claim a given set of
1951          * hash slots, but with a configuration that other instances know to
1952          * be deprecated. Example:
1953          *
1954          * A and B are master and slave for slots 1,2,3.
1955          * A is partitioned away, B gets promoted.
1956          * B is partitioned away, and A returns available.
1957          *
1958          * Usually B would PING A publishing its set of served slots and its
1959          * configEpoch, but because of the partition B can't inform A of the
1960          * new configuration, so other nodes that have an updated table must
1961          * do it. In this way A will stop to act as a master (or can try to
1962          * failover if there are the conditions to win the election). */
1963         if (sender && dirty_slots) {
1964             int j;
1965 
1966             for (j = 0; j < CLUSTER_SLOTS; j++) {
1967                 if (bitmapTestBit(hdr->myslots,j)) {
1968                     if (server.cluster->slots[j] == sender ||
1969                         server.cluster->slots[j] == NULL) continue;
1970                     if (server.cluster->slots[j]->configEpoch >
1971                         senderConfigEpoch)
1972                     {
1973                         serverLog(LL_VERBOSE,
1974                             "Node %.40s has old slots configuration, sending "
1975                             "an UPDATE message about %.40s",
1976                                 sender->name, server.cluster->slots[j]->name);
1977                         clusterSendUpdate(sender->link,
1978                             server.cluster->slots[j]);
1979 
1980                         /* TODO: instead of exiting the loop send every other
1981                          * UPDATE packet for other nodes that are the new owner
1982                          * of sender's slots. */
1983                         break;
1984                     }
1985                 }
1986             }
1987         }
1988 
1989         /* If our config epoch collides with the sender's try to fix
1990          * the problem. */
1991         if (sender &&
1992             nodeIsMaster(myself) && nodeIsMaster(sender) &&
1993             senderConfigEpoch == myself->configEpoch)
1994         {
1995             clusterHandleConfigEpochCollision(sender);
1996         }
1997 
1998         /* Get info from the gossip section */
1999         if (sender) clusterProcessGossipSection(hdr,link);
2000     } else if (type == CLUSTERMSG_TYPE_FAIL) {
2001         clusterNode *failing;
2002 
2003         if (sender) {
2004             failing = clusterLookupNode(hdr->data.fail.about.nodename);
2005             if (failing &&
2006                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2007             {
2008                 serverLog(LL_NOTICE,
2009                     "FAIL message received from %.40s about %.40s",
2010                     hdr->sender, hdr->data.fail.about.nodename);
2011                 failing->flags |= CLUSTER_NODE_FAIL;
2012                 failing->fail_time = mstime();
2013                 failing->flags &= ~CLUSTER_NODE_PFAIL;
2014                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2015                                      CLUSTER_TODO_UPDATE_STATE);
2016             }
2017         } else {
2018             serverLog(LL_NOTICE,
2019                 "Ignoring FAIL message from unknown node %.40s about %.40s",
2020                 hdr->sender, hdr->data.fail.about.nodename);
2021         }
2022     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2023         robj *channel, *message;
2024         uint32_t channel_len, message_len;
2025 
2026         /* Don't bother creating useless objects if there are no
2027          * Pub/Sub subscribers. */
2028         if (dictSize(server.pubsub_channels) ||
2029            listLength(server.pubsub_patterns))
2030         {
2031             channel_len = ntohl(hdr->data.publish.msg.channel_len);
2032             message_len = ntohl(hdr->data.publish.msg.message_len);
2033             channel = createStringObject(
2034                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
2035             message = createStringObject(
2036                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
2037                         message_len);
2038             pubsubPublishMessage(channel,message);
2039             decrRefCount(channel);
2040             decrRefCount(message);
2041         }
2042     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2043         if (!sender) return 1;  /* We don't know that node. */
2044         clusterSendFailoverAuthIfNeeded(sender,hdr);
2045     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2046         if (!sender) return 1;  /* We don't know that node. */
2047         /* We consider this vote only if the sender is a master serving
2048          * a non zero number of slots, and its currentEpoch is greater or
2049          * equal to epoch where this node started the election. */
2050         if (nodeIsMaster(sender) && sender->numslots > 0 &&
2051             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2052         {
2053             server.cluster->failover_auth_count++;
2054             /* Maybe we reached a quorum here, set a flag to make sure
2055              * we check ASAP. */
2056             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2057         }
2058     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2059         /* This message is acceptable only if I'm a master and the sender
2060          * is one of my slaves. */
2061         if (!sender || sender->slaveof != myself) return 1;
2062         /* Manual failover requested from slaves. Initialize the state
2063          * accordingly. */
2064         resetManualFailover();
2065         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
2066         server.cluster->mf_slave = sender;
2067         pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
2068         serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2069             sender->name);
2070     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2071         clusterNode *n; /* The node the update is about. */
2072         uint64_t reportedConfigEpoch =
2073                     ntohu64(hdr->data.update.nodecfg.configEpoch);
2074 
2075         if (!sender) return 1;  /* We don't know the sender. */
2076         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2077         if (!n) return 1;   /* We don't know the reported node. */
2078         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2079 
2080         /* If in our current config the node is a slave, set it as a master. */
2081         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2082 
2083         /* Update the node's configEpoch. */
2084         n->configEpoch = reportedConfigEpoch;
2085         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2086                              CLUSTER_TODO_FSYNC_CONFIG);
2087 
2088         /* Check the bitmap of served slots and update our
2089          * config accordingly. */
2090         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2091             hdr->data.update.nodecfg.slots);
2092     } else if (type == CLUSTERMSG_TYPE_MODULE) {
2093         if (!sender) return 1;  /* Protect the module from unknown nodes. */
2094         /* We need to route this message back to the right module subscribed
2095          * for the right message type. */
2096         uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2097         uint32_t len = ntohl(hdr->data.module.msg.len);
2098         uint8_t type = hdr->data.module.msg.type;
2099         unsigned char *payload = hdr->data.module.msg.bulk_data;
2100         moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2101     } else {
2102         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2103     }
2104     return 1;
2105 }
2106 
2107 /* This function is called when we detect the link with this node is lost.
2108    We set the node as no longer connected. The Cluster Cron will detect
2109    this connection and will try to get it connected again.
2110 
2111    Instead if the node is a temporary node used to accept a query, we
2112    completely free the node on error. */
handleLinkIOError(clusterLink * link)2113 void handleLinkIOError(clusterLink *link) {
2114     freeClusterLink(link);
2115 }
2116 
2117 /* Send data. This is handled using a trivial send buffer that gets
2118  * consumed by write(). We don't try to optimize this for speed too much
2119  * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)2120 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2121     clusterLink *link = (clusterLink*) privdata;
2122     ssize_t nwritten;
2123     UNUSED(el);
2124     UNUSED(mask);
2125 
2126     nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2127     if (nwritten <= 0) {
2128         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2129             (nwritten == -1) ? strerror(errno) : "short write");
2130         handleLinkIOError(link);
2131         return;
2132     }
2133     sdsrange(link->sndbuf,nwritten,-1);
2134     if (sdslen(link->sndbuf) == 0)
2135         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2136 }
2137 
2138 /* Read data. Try to read the first field of the header first to check the
2139  * full length of the packet. When a whole packet is in memory this function
2140  * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2141 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2142     char buf[sizeof(clusterMsg)];
2143     ssize_t nread;
2144     clusterMsg *hdr;
2145     clusterLink *link = (clusterLink*) privdata;
2146     unsigned int readlen, rcvbuflen;
2147     UNUSED(el);
2148     UNUSED(mask);
2149 
2150     while(1) { /* Read as long as there is data to read. */
2151         rcvbuflen = sdslen(link->rcvbuf);
2152         if (rcvbuflen < 8) {
2153             /* First, obtain the first 8 bytes to get the full message
2154              * length. */
2155             readlen = 8 - rcvbuflen;
2156         } else {
2157             /* Finally read the full message. */
2158             hdr = (clusterMsg*) link->rcvbuf;
2159             if (rcvbuflen == 8) {
2160                 /* Perform some sanity check on the message signature
2161                  * and length. */
2162                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2163                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2164                 {
2165                     serverLog(LL_WARNING,
2166                         "Bad message length or signature received "
2167                         "from Cluster bus.");
2168                     handleLinkIOError(link);
2169                     return;
2170                 }
2171             }
2172             readlen = ntohl(hdr->totlen) - rcvbuflen;
2173             if (readlen > sizeof(buf)) readlen = sizeof(buf);
2174         }
2175 
2176         nread = read(fd,buf,readlen);
2177         if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2178 
2179         if (nread <= 0) {
2180             /* I/O error... */
2181             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2182                 (nread == 0) ? "connection closed" : strerror(errno));
2183             handleLinkIOError(link);
2184             return;
2185         } else {
2186             /* Read data and recast the pointer to the new buffer. */
2187             link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2188             hdr = (clusterMsg*) link->rcvbuf;
2189             rcvbuflen += nread;
2190         }
2191 
2192         /* Total length obtained? Process this packet. */
2193         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2194             if (clusterProcessPacket(link)) {
2195                 sdsfree(link->rcvbuf);
2196                 link->rcvbuf = sdsempty();
2197             } else {
2198                 return; /* Link no longer valid. */
2199             }
2200         }
2201     }
2202 }
2203 
2204 /* Put stuff into the send buffer.
2205  *
2206  * It is guaranteed that this function will never have as a side effect
2207  * the link to be invalidated, so it is safe to call this function
2208  * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2209 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2210     if (sdslen(link->sndbuf) == 0 && msglen != 0)
2211         aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
2212                     clusterWriteHandler,link);
2213 
2214     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2215 
2216     /* Populate sent messages stats. */
2217     clusterMsg *hdr = (clusterMsg*) msg;
2218     uint16_t type = ntohs(hdr->type);
2219     if (type < CLUSTERMSG_TYPE_COUNT)
2220         server.cluster->stats_bus_messages_sent[type]++;
2221 }
2222 
2223 /* Send a message to all the nodes that are part of the cluster having
2224  * a connected link.
2225  *
2226  * It is guaranteed that this function will never have as a side effect
2227  * some node->link to be invalidated, so it is safe to call this function
2228  * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2229 void clusterBroadcastMessage(void *buf, size_t len) {
2230     dictIterator *di;
2231     dictEntry *de;
2232 
2233     di = dictGetSafeIterator(server.cluster->nodes);
2234     while((de = dictNext(di)) != NULL) {
2235         clusterNode *node = dictGetVal(de);
2236 
2237         if (!node->link) continue;
2238         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2239             continue;
2240         clusterSendMessage(node->link,buf,len);
2241     }
2242     dictReleaseIterator(di);
2243 }
2244 
2245 /* Build the message header. hdr must point to a buffer at least
2246  * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2247 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2248     int totlen = 0;
2249     uint64_t offset;
2250     clusterNode *master;
2251 
2252     /* If this node is a master, we send its slots bitmap and configEpoch.
2253      * If this node is a slave we send the master's information instead (the
2254      * node is flagged as slave so the receiver knows that it is NOT really
2255      * in charge for this slots. */
2256     master = (nodeIsSlave(myself) && myself->slaveof) ?
2257               myself->slaveof : myself;
2258 
2259     memset(hdr,0,sizeof(*hdr));
2260     hdr->ver = htons(CLUSTER_PROTO_VER);
2261     hdr->sig[0] = 'R';
2262     hdr->sig[1] = 'C';
2263     hdr->sig[2] = 'm';
2264     hdr->sig[3] = 'b';
2265     hdr->type = htons(type);
2266     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2267 
2268     /* If cluster-announce-ip option is enabled, force the receivers of our
2269      * packets to use the specified address for this node. Otherwise if the
2270      * first byte is zero, they'll do auto discovery. */
2271     memset(hdr->myip,0,NET_IP_STR_LEN);
2272     if (server.cluster_announce_ip) {
2273         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2274         hdr->myip[NET_IP_STR_LEN-1] = '\0';
2275     }
2276 
2277     /* Handle cluster-announce-port as well. */
2278     int announced_port = server.cluster_announce_port ?
2279                          server.cluster_announce_port : server.port;
2280     int announced_cport = server.cluster_announce_bus_port ?
2281                           server.cluster_announce_bus_port :
2282                           (server.port + CLUSTER_PORT_INCR);
2283 
2284     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2285     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2286     if (myself->slaveof != NULL)
2287         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2288     hdr->port = htons(announced_port);
2289     hdr->cport = htons(announced_cport);
2290     hdr->flags = htons(myself->flags);
2291     hdr->state = server.cluster->state;
2292 
2293     /* Set the currentEpoch and configEpochs. */
2294     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2295     hdr->configEpoch = htonu64(master->configEpoch);
2296 
2297     /* Set the replication offset. */
2298     if (nodeIsSlave(myself))
2299         offset = replicationGetSlaveOffset();
2300     else
2301         offset = server.master_repl_offset;
2302     hdr->offset = htonu64(offset);
2303 
2304     /* Set the message flags. */
2305     if (nodeIsMaster(myself) && server.cluster->mf_end)
2306         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2307 
2308     /* Compute the message length for certain messages. For other messages
2309      * this is up to the caller. */
2310     if (type == CLUSTERMSG_TYPE_FAIL) {
2311         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2312         totlen += sizeof(clusterMsgDataFail);
2313     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2314         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2315         totlen += sizeof(clusterMsgDataUpdate);
2316     }
2317     hdr->totlen = htonl(totlen);
2318     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2319 }
2320 
2321 /* Return non zero if the node is already present in the gossip section of the
2322  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2323  * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2324 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2325     int j;
2326     for (j = 0; j < count; j++) {
2327         if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2328                 CLUSTER_NAMELEN) == 0) break;
2329     }
2330     return j != count;
2331 }
2332 
2333 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2334  * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2335 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2336     clusterMsgDataGossip *gossip;
2337     gossip = &(hdr->data.ping.gossip[i]);
2338     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2339     gossip->ping_sent = htonl(n->ping_sent/1000);
2340     gossip->pong_received = htonl(n->pong_received/1000);
2341     memcpy(gossip->ip,n->ip,sizeof(n->ip));
2342     gossip->port = htons(n->port);
2343     gossip->cport = htons(n->cport);
2344     gossip->flags = htons(n->flags);
2345     gossip->notused1 = 0;
2346 }
2347 
2348 /* Send a PING or PONG packet to the specified node, making sure to add enough
2349  * gossip informations. */
clusterSendPing(clusterLink * link,int type)2350 void clusterSendPing(clusterLink *link, int type) {
2351     unsigned char *buf;
2352     clusterMsg *hdr;
2353     int gossipcount = 0; /* Number of gossip sections added so far. */
2354     int wanted; /* Number of gossip sections we want to append if possible. */
2355     int totlen; /* Total packet length. */
2356     /* freshnodes is the max number of nodes we can hope to append at all:
2357      * nodes available minus two (ourself and the node we are sending the
2358      * message to). However practically there may be less valid nodes since
2359      * nodes in handshake state, disconnected, are not considered. */
2360     int freshnodes = dictSize(server.cluster->nodes)-2;
2361 
2362     /* How many gossip sections we want to add? 1/10 of the number of nodes
2363      * and anyway at least 3. Why 1/10?
2364      *
2365      * If we have N masters, with N/10 entries, and we consider that in
2366      * node_timeout we exchange with each other node at least 4 packets
2367      * (we ping in the worst case in node_timeout/2 time, and we also
2368      * receive two pings from the host), we have a total of 8 packets
2369      * in the node_timeout*2 falure reports validity time. So we have
2370      * that, for a single PFAIL node, we can expect to receive the following
2371      * number of failure reports (in the specified window of time):
2372      *
2373      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2374      *
2375      * PROB = probability of being featured in a single gossip entry,
2376      *        which is 1 / NUM_OF_NODES.
2377      * ENTRIES = 10.
2378      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2379      *
2380      * If we assume we have just masters (so num of nodes and num of masters
2381      * is the same), with 1/10 we always get over the majority, and specifically
2382      * 80% of the number of nodes, to account for many masters failing at the
2383      * same time.
2384      *
2385      * Since we have non-voting slaves that lower the probability of an entry
2386      * to feature our node, we set the number of entries per packet as
2387      * 10% of the total nodes we have. */
2388     wanted = floor(dictSize(server.cluster->nodes)/10);
2389     if (wanted < 3) wanted = 3;
2390     if (wanted > freshnodes) wanted = freshnodes;
2391 
2392     /* Include all the nodes in PFAIL state, so that failure reports are
2393      * faster to propagate to go from PFAIL to FAIL state. */
2394     int pfail_wanted = server.cluster->stats_pfail_nodes;
2395 
2396     /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2397      * later according to the number of gossip sections we really were able
2398      * to put inside the packet. */
2399     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2400     totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2401     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2402      * sizeof(clusterMsg) or more. */
2403     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2404     buf = zcalloc(totlen);
2405     hdr = (clusterMsg*) buf;
2406 
2407     /* Populate the header. */
2408     if (link->node && type == CLUSTERMSG_TYPE_PING)
2409         link->node->ping_sent = mstime();
2410     clusterBuildMessageHdr(hdr,type);
2411 
2412     /* Populate the gossip fields */
2413     int maxiterations = wanted*3;
2414     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2415         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2416         clusterNode *this = dictGetVal(de);
2417 
2418         /* Don't include this node: the whole packet header is about us
2419          * already, so we just gossip about other nodes. */
2420         if (this == myself) continue;
2421 
2422         /* PFAIL nodes will be added later. */
2423         if (this->flags & CLUSTER_NODE_PFAIL) continue;
2424 
2425         /* In the gossip section don't include:
2426          * 1) Nodes in HANDSHAKE state.
2427          * 3) Nodes with the NOADDR flag set.
2428          * 4) Disconnected nodes if they don't have configured slots.
2429          */
2430         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2431             (this->link == NULL && this->numslots == 0))
2432         {
2433             freshnodes--; /* Tecnically not correct, but saves CPU. */
2434             continue;
2435         }
2436 
2437         /* Do not add a node we already have. */
2438         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2439 
2440         /* Add it */
2441         clusterSetGossipEntry(hdr,gossipcount,this);
2442         freshnodes--;
2443         gossipcount++;
2444     }
2445 
2446     /* If there are PFAIL nodes, add them at the end. */
2447     if (pfail_wanted) {
2448         dictIterator *di;
2449         dictEntry *de;
2450 
2451         di = dictGetSafeIterator(server.cluster->nodes);
2452         while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2453             clusterNode *node = dictGetVal(de);
2454             if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2455             if (node->flags & CLUSTER_NODE_NOADDR) continue;
2456             if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2457             clusterSetGossipEntry(hdr,gossipcount,node);
2458             freshnodes--;
2459             gossipcount++;
2460             /* We take the count of the slots we allocated, since the
2461              * PFAIL stats may not match perfectly with the current number
2462              * of PFAIL nodes. */
2463             pfail_wanted--;
2464         }
2465         dictReleaseIterator(di);
2466     }
2467 
2468     /* Ready to send... fix the totlen fiend and queue the message in the
2469      * output buffer. */
2470     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2471     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2472     hdr->count = htons(gossipcount);
2473     hdr->totlen = htonl(totlen);
2474     clusterSendMessage(link,buf,totlen);
2475     zfree(buf);
2476 }
2477 
2478 /* Send a PONG packet to every connected node that's not in handshake state
2479  * and for which we have a valid link.
2480  *
2481  * In Redis Cluster pongs are not used just for failure detection, but also
2482  * to carry important configuration information. So broadcasting a pong is
2483  * useful when something changes in the configuration and we want to make
2484  * the cluster aware ASAP (for instance after a slave promotion).
2485  *
2486  * The 'target' argument specifies the receiving instances using the
2487  * defines below:
2488  *
2489  * CLUSTER_BROADCAST_ALL -> All known instances.
2490  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2491  */
2492 #define CLUSTER_BROADCAST_ALL 0
2493 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2494 void clusterBroadcastPong(int target) {
2495     dictIterator *di;
2496     dictEntry *de;
2497 
2498     di = dictGetSafeIterator(server.cluster->nodes);
2499     while((de = dictNext(di)) != NULL) {
2500         clusterNode *node = dictGetVal(de);
2501 
2502         if (!node->link) continue;
2503         if (node == myself || nodeInHandshake(node)) continue;
2504         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2505             int local_slave =
2506                 nodeIsSlave(node) && node->slaveof &&
2507                 (node->slaveof == myself || node->slaveof == myself->slaveof);
2508             if (!local_slave) continue;
2509         }
2510         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2511     }
2512     dictReleaseIterator(di);
2513 }
2514 
2515 /* Send a PUBLISH message.
2516  *
2517  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2518 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2519     unsigned char buf[sizeof(clusterMsg)], *payload;
2520     clusterMsg *hdr = (clusterMsg*) buf;
2521     uint32_t totlen;
2522     uint32_t channel_len, message_len;
2523 
2524     channel = getDecodedObject(channel);
2525     message = getDecodedObject(message);
2526     channel_len = sdslen(channel->ptr);
2527     message_len = sdslen(message->ptr);
2528 
2529     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2530     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2531     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2532 
2533     hdr->data.publish.msg.channel_len = htonl(channel_len);
2534     hdr->data.publish.msg.message_len = htonl(message_len);
2535     hdr->totlen = htonl(totlen);
2536 
2537     /* Try to use the local buffer if possible */
2538     if (totlen < sizeof(buf)) {
2539         payload = buf;
2540     } else {
2541         payload = zmalloc(totlen);
2542         memcpy(payload,hdr,sizeof(*hdr));
2543         hdr = (clusterMsg*) payload;
2544     }
2545     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2546     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2547         message->ptr,sdslen(message->ptr));
2548 
2549     if (link)
2550         clusterSendMessage(link,payload,totlen);
2551     else
2552         clusterBroadcastMessage(payload,totlen);
2553 
2554     decrRefCount(channel);
2555     decrRefCount(message);
2556     if (payload != buf) zfree(payload);
2557 }
2558 
2559 /* Send a FAIL message to all the nodes we are able to contact.
2560  * The FAIL message is sent when we detect that a node is failing
2561  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2562  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2563  * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2564 void clusterSendFail(char *nodename) {
2565     unsigned char buf[sizeof(clusterMsg)];
2566     clusterMsg *hdr = (clusterMsg*) buf;
2567 
2568     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2569     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2570     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2571 }
2572 
2573 /* Send an UPDATE message to the specified link carrying the specified 'node'
2574  * slots configuration. The node name, slots bitmap, and configEpoch info
2575  * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2576 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2577     unsigned char buf[sizeof(clusterMsg)];
2578     clusterMsg *hdr = (clusterMsg*) buf;
2579 
2580     if (link == NULL) return;
2581     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2582     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2583     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2584     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2585     clusterSendMessage(link,buf,ntohl(hdr->totlen));
2586 }
2587 
2588 /* Send a MODULE message.
2589  *
2590  * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2591 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2592                        unsigned char *payload, uint32_t len) {
2593     unsigned char buf[sizeof(clusterMsg)], *heapbuf;
2594     clusterMsg *hdr = (clusterMsg*) buf;
2595     uint32_t totlen;
2596 
2597     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2598     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2599     totlen += sizeof(clusterMsgModule) - 3 + len;
2600 
2601     hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2602     hdr->data.module.msg.type = type;
2603     hdr->data.module.msg.len = htonl(len);
2604     hdr->totlen = htonl(totlen);
2605 
2606     /* Try to use the local buffer if possible */
2607     if (totlen < sizeof(buf)) {
2608         heapbuf = buf;
2609     } else {
2610         heapbuf = zmalloc(totlen);
2611         memcpy(heapbuf,hdr,sizeof(*hdr));
2612         hdr = (clusterMsg*) heapbuf;
2613     }
2614     memcpy(hdr->data.module.msg.bulk_data,payload,len);
2615 
2616     if (link)
2617         clusterSendMessage(link,heapbuf,totlen);
2618     else
2619         clusterBroadcastMessage(heapbuf,totlen);
2620 
2621     if (heapbuf != buf) zfree(heapbuf);
2622 }
2623 
2624 /* This function gets a cluster node ID string as target, the same way the nodes
2625  * addresses are represented in the modules side, resolves the node, and sends
2626  * the message. If the target is NULL the message is broadcasted.
2627  *
2628  * The function returns C_OK if the target is valid, otherwise C_ERR is
2629  * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2630 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2631     clusterNode *node = NULL;
2632 
2633     if (target != NULL) {
2634         node = clusterLookupNode(target);
2635         if (node == NULL || node->link == NULL) return C_ERR;
2636     }
2637 
2638     clusterSendModule(target ? node->link : NULL,
2639                       module_id, type, payload, len);
2640     return C_OK;
2641 }
2642 
2643 /* -----------------------------------------------------------------------------
2644  * CLUSTER Pub/Sub support
2645  *
2646  * For now we do very little, just propagating PUBLISH messages across the whole
2647  * cluster. In the future we'll try to get smarter and avoiding propagating those
2648  * messages to hosts without receives for a given channel.
2649  * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2650 void clusterPropagatePublish(robj *channel, robj *message) {
2651     clusterSendPublish(NULL, channel, message);
2652 }
2653 
2654 /* -----------------------------------------------------------------------------
2655  * SLAVE node specific functions
2656  * -------------------------------------------------------------------------- */
2657 
2658 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2659  * see if there is the quorum for this slave instance to failover its failing
2660  * master.
2661  *
2662  * Note that we send the failover request to everybody, master and slave nodes,
2663  * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2664 void clusterRequestFailoverAuth(void) {
2665     unsigned char buf[sizeof(clusterMsg)];
2666     clusterMsg *hdr = (clusterMsg*) buf;
2667     uint32_t totlen;
2668 
2669     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2670     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2671      * in the header to communicate the nodes receiving the message that
2672      * they should authorized the failover even if the master is working. */
2673     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2674     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2675     hdr->totlen = htonl(totlen);
2676     clusterBroadcastMessage(buf,totlen);
2677 }
2678 
2679 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2680 void clusterSendFailoverAuth(clusterNode *node) {
2681     unsigned char buf[sizeof(clusterMsg)];
2682     clusterMsg *hdr = (clusterMsg*) buf;
2683     uint32_t totlen;
2684 
2685     if (!node->link) return;
2686     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2687     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2688     hdr->totlen = htonl(totlen);
2689     clusterSendMessage(node->link,buf,totlen);
2690 }
2691 
2692 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2693 void clusterSendMFStart(clusterNode *node) {
2694     unsigned char buf[sizeof(clusterMsg)];
2695     clusterMsg *hdr = (clusterMsg*) buf;
2696     uint32_t totlen;
2697 
2698     if (!node->link) return;
2699     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2700     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2701     hdr->totlen = htonl(totlen);
2702     clusterSendMessage(node->link,buf,totlen);
2703 }
2704 
2705 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2706 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2707     clusterNode *master = node->slaveof;
2708     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2709     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2710     unsigned char *claimed_slots = request->myslots;
2711     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2712     int j;
2713 
2714     /* IF we are not a master serving at least 1 slot, we don't have the
2715      * right to vote, as the cluster size in Redis Cluster is the number
2716      * of masters serving at least one slot, and quorum is the cluster
2717      * size + 1 */
2718     if (nodeIsSlave(myself) || myself->numslots == 0) return;
2719 
2720     /* Request epoch must be >= our currentEpoch.
2721      * Note that it is impossible for it to actually be greater since
2722      * our currentEpoch was updated as a side effect of receiving this
2723      * request, if the request epoch was greater. */
2724     if (requestCurrentEpoch < server.cluster->currentEpoch) {
2725         serverLog(LL_WARNING,
2726             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2727             node->name,
2728             (unsigned long long) requestCurrentEpoch,
2729             (unsigned long long) server.cluster->currentEpoch);
2730         return;
2731     }
2732 
2733     /* I already voted for this epoch? Return ASAP. */
2734     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2735         serverLog(LL_WARNING,
2736                 "Failover auth denied to %.40s: already voted for epoch %llu",
2737                 node->name,
2738                 (unsigned long long) server.cluster->currentEpoch);
2739         return;
2740     }
2741 
2742     /* Node must be a slave and its master down.
2743      * The master can be non failing if the request is flagged
2744      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2745     if (nodeIsMaster(node) || master == NULL ||
2746         (!nodeFailed(master) && !force_ack))
2747     {
2748         if (nodeIsMaster(node)) {
2749             serverLog(LL_WARNING,
2750                     "Failover auth denied to %.40s: it is a master node",
2751                     node->name);
2752         } else if (master == NULL) {
2753             serverLog(LL_WARNING,
2754                     "Failover auth denied to %.40s: I don't know its master",
2755                     node->name);
2756         } else if (!nodeFailed(master)) {
2757             serverLog(LL_WARNING,
2758                     "Failover auth denied to %.40s: its master is up",
2759                     node->name);
2760         }
2761         return;
2762     }
2763 
2764     /* We did not voted for a slave about this master for two
2765      * times the node timeout. This is not strictly needed for correctness
2766      * of the algorithm but makes the base case more linear. */
2767     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2768     {
2769         serverLog(LL_WARNING,
2770                 "Failover auth denied to %.40s: "
2771                 "can't vote about this master before %lld milliseconds",
2772                 node->name,
2773                 (long long) ((server.cluster_node_timeout*2)-
2774                              (mstime() - node->slaveof->voted_time)));
2775         return;
2776     }
2777 
2778     /* The slave requesting the vote must have a configEpoch for the claimed
2779      * slots that is >= the one of the masters currently serving the same
2780      * slots in the current configuration. */
2781     for (j = 0; j < CLUSTER_SLOTS; j++) {
2782         if (bitmapTestBit(claimed_slots, j) == 0) continue;
2783         if (server.cluster->slots[j] == NULL ||
2784             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2785         {
2786             continue;
2787         }
2788         /* If we reached this point we found a slot that in our current slots
2789          * is served by a master with a greater configEpoch than the one claimed
2790          * by the slave requesting our vote. Refuse to vote for this slave. */
2791         serverLog(LL_WARNING,
2792                 "Failover auth denied to %.40s: "
2793                 "slot %d epoch (%llu) > reqEpoch (%llu)",
2794                 node->name, j,
2795                 (unsigned long long) server.cluster->slots[j]->configEpoch,
2796                 (unsigned long long) requestConfigEpoch);
2797         return;
2798     }
2799 
2800     /* We can vote for this slave. */
2801     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2802     node->slaveof->voted_time = mstime();
2803     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2804     clusterSendFailoverAuth(node);
2805     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2806         node->name, (unsigned long long) server.cluster->currentEpoch);
2807 }
2808 
2809 /* This function returns the "rank" of this instance, a slave, in the context
2810  * of its master-slaves ring. The rank of the slave is given by the number of
2811  * other slaves for the same master that have a better replication offset
2812  * compared to the local one (better means, greater, so they claim more data).
2813  *
2814  * A slave with rank 0 is the one with the greatest (most up to date)
2815  * replication offset, and so forth. Note that because how the rank is computed
2816  * multiple slaves may have the same rank, in case they have the same offset.
2817  *
2818  * The slave rank is used to add a delay to start an election in order to
2819  * get voted and replace a failing master. Slaves with better replication
2820  * offsets are more likely to win. */
clusterGetSlaveRank(void)2821 int clusterGetSlaveRank(void) {
2822     long long myoffset;
2823     int j, rank = 0;
2824     clusterNode *master;
2825 
2826     serverAssert(nodeIsSlave(myself));
2827     master = myself->slaveof;
2828     if (master == NULL) return 0; /* Never called by slaves without master. */
2829 
2830     myoffset = replicationGetSlaveOffset();
2831     for (j = 0; j < master->numslaves; j++)
2832         if (master->slaves[j] != myself &&
2833             !nodeCantFailover(master->slaves[j]) &&
2834             master->slaves[j]->repl_offset > myoffset) rank++;
2835     return rank;
2836 }
2837 
2838 /* This function is called by clusterHandleSlaveFailover() in order to
2839  * let the slave log why it is not able to failover. Sometimes there are
2840  * not the conditions, but since the failover function is called again and
2841  * again, we can't log the same things continuously.
2842  *
2843  * This function works by logging only if a given set of conditions are
2844  * true:
2845  *
2846  * 1) The reason for which the failover can't be initiated changed.
2847  *    The reasons also include a NONE reason we reset the state to
2848  *    when the slave finds that its master is fine (no FAIL flag).
2849  * 2) Also, the log is emitted again if the master is still down and
2850  *    the reason for not failing over is still the same, but more than
2851  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2852  * 3) Finally, the function only logs if the slave is down for more than
2853  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
2854  *    failover starts in a reasonable time.
2855  *
2856  * The function is called with the reason why the slave can't failover
2857  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2858  *
2859  * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2860 void clusterLogCantFailover(int reason) {
2861     char *msg;
2862     static time_t lastlog_time = 0;
2863     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2864 
2865     /* Don't log if we have the same reason for some time. */
2866     if (reason == server.cluster->cant_failover_reason &&
2867         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2868         return;
2869 
2870     server.cluster->cant_failover_reason = reason;
2871 
2872     /* We also don't emit any log if the master failed no long ago, the
2873      * goal of this function is to log slaves in a stalled condition for
2874      * a long time. */
2875     if (myself->slaveof &&
2876         nodeFailed(myself->slaveof) &&
2877         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2878 
2879     switch(reason) {
2880     case CLUSTER_CANT_FAILOVER_DATA_AGE:
2881         msg = "Disconnected from master for longer than allowed. "
2882               "Please check the 'cluster-replica-validity-factor' configuration "
2883               "option.";
2884         break;
2885     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2886         msg = "Waiting the delay before I can start a new failover.";
2887         break;
2888     case CLUSTER_CANT_FAILOVER_EXPIRED:
2889         msg = "Failover attempt expired.";
2890         break;
2891     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2892         msg = "Waiting for votes, but majority still not reached.";
2893         break;
2894     default:
2895         msg = "Unknown reason code.";
2896         break;
2897     }
2898     lastlog_time = time(NULL);
2899     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2900 }
2901 
2902 /* This function implements the final part of automatic and manual failovers,
2903  * where the slave grabs its master's hash slots, and propagates the new
2904  * configuration.
2905  *
2906  * Note that it's up to the caller to be sure that the node got a new
2907  * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2908 void clusterFailoverReplaceYourMaster(void) {
2909     int j;
2910     clusterNode *oldmaster = myself->slaveof;
2911 
2912     if (nodeIsMaster(myself) || oldmaster == NULL) return;
2913 
2914     /* 1) Turn this node into a master. */
2915     clusterSetNodeAsMaster(myself);
2916     replicationUnsetMaster();
2917 
2918     /* 2) Claim all the slots assigned to our master. */
2919     for (j = 0; j < CLUSTER_SLOTS; j++) {
2920         if (clusterNodeGetSlotBit(oldmaster,j)) {
2921             clusterDelSlot(j);
2922             clusterAddSlot(myself,j);
2923         }
2924     }
2925 
2926     /* 3) Update state and save config. */
2927     clusterUpdateState();
2928     clusterSaveConfigOrDie(1);
2929 
2930     /* 4) Pong all the other nodes so that they can update the state
2931      *    accordingly and detect that we switched to master role. */
2932     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2933 
2934     /* 5) If there was a manual failover in progress, clear the state. */
2935     resetManualFailover();
2936 }
2937 
2938 /* This function is called if we are a slave node and our master serving
2939  * a non-zero amount of hash slots is in FAIL state.
2940  *
2941  * The gaol of this function is:
2942  * 1) To check if we are able to perform a failover, is our data updated?
2943  * 2) Try to get elected by masters.
2944  * 3) Perform the failover informing all the other nodes.
2945  */
clusterHandleSlaveFailover(void)2946 void clusterHandleSlaveFailover(void) {
2947     mstime_t data_age;
2948     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2949     int needed_quorum = (server.cluster->size / 2) + 1;
2950     int manual_failover = server.cluster->mf_end != 0 &&
2951                           server.cluster->mf_can_start;
2952     mstime_t auth_timeout, auth_retry_time;
2953 
2954     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2955 
2956     /* Compute the failover timeout (the max time we have to send votes
2957      * and wait for replies), and the failover retry time (the time to wait
2958      * before trying to get voted again).
2959      *
2960      * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
2961      * Retry is two times the Timeout.
2962      */
2963     auth_timeout = server.cluster_node_timeout*2;
2964     if (auth_timeout < 2000) auth_timeout = 2000;
2965     auth_retry_time = auth_timeout*2;
2966 
2967     /* Pre conditions to run the function, that must be met both in case
2968      * of an automatic or manual failover:
2969      * 1) We are a slave.
2970      * 2) Our master is flagged as FAIL, or this is a manual failover.
2971      * 3) We don't have the no failover configuration set, and this is
2972      *    not a manual failover.
2973      * 4) It is serving slots. */
2974     if (nodeIsMaster(myself) ||
2975         myself->slaveof == NULL ||
2976         (!nodeFailed(myself->slaveof) && !manual_failover) ||
2977         (server.cluster_slave_no_failover && !manual_failover) ||
2978         myself->slaveof->numslots == 0)
2979     {
2980         /* There are no reasons to failover, so we set the reason why we
2981          * are returning without failing over to NONE. */
2982         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2983         return;
2984     }
2985 
2986     /* Set data_age to the number of seconds we are disconnected from
2987      * the master. */
2988     if (server.repl_state == REPL_STATE_CONNECTED) {
2989         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2990                    * 1000;
2991     } else {
2992         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2993     }
2994 
2995     /* Remove the node timeout from the data age as it is fine that we are
2996      * disconnected from our master at least for the time it was down to be
2997      * flagged as FAIL, that's the baseline. */
2998     if (data_age > server.cluster_node_timeout)
2999         data_age -= server.cluster_node_timeout;
3000 
3001     /* Check if our data is recent enough according to the slave validity
3002      * factor configured by the user.
3003      *
3004      * Check bypassed for manual failovers. */
3005     if (server.cluster_slave_validity_factor &&
3006         data_age >
3007         (((mstime_t)server.repl_ping_slave_period * 1000) +
3008          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3009     {
3010         if (!manual_failover) {
3011             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3012             return;
3013         }
3014     }
3015 
3016     /* If the previous failover attempt timedout and the retry time has
3017      * elapsed, we can setup a new one. */
3018     if (auth_age > auth_retry_time) {
3019         server.cluster->failover_auth_time = mstime() +
3020             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3021             random() % 500; /* Random delay between 0 and 500 milliseconds. */
3022         server.cluster->failover_auth_count = 0;
3023         server.cluster->failover_auth_sent = 0;
3024         server.cluster->failover_auth_rank = clusterGetSlaveRank();
3025         /* We add another delay that is proportional to the slave rank.
3026          * Specifically 1 second * rank. This way slaves that have a probably
3027          * less updated replication offset, are penalized. */
3028         server.cluster->failover_auth_time +=
3029             server.cluster->failover_auth_rank * 1000;
3030         /* However if this is a manual failover, no delay is needed. */
3031         if (server.cluster->mf_end) {
3032             server.cluster->failover_auth_time = mstime();
3033             server.cluster->failover_auth_rank = 0;
3034 	    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3035         }
3036         serverLog(LL_WARNING,
3037             "Start of election delayed for %lld milliseconds "
3038             "(rank #%d, offset %lld).",
3039             server.cluster->failover_auth_time - mstime(),
3040             server.cluster->failover_auth_rank,
3041             replicationGetSlaveOffset());
3042         /* Now that we have a scheduled election, broadcast our offset
3043          * to all the other slaves so that they'll updated their offsets
3044          * if our offset is better. */
3045         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3046         return;
3047     }
3048 
3049     /* It is possible that we received more updated offsets from other
3050      * slaves for the same master since we computed our election delay.
3051      * Update the delay if our rank changed.
3052      *
3053      * Not performed if this is a manual failover. */
3054     if (server.cluster->failover_auth_sent == 0 &&
3055         server.cluster->mf_end == 0)
3056     {
3057         int newrank = clusterGetSlaveRank();
3058         if (newrank > server.cluster->failover_auth_rank) {
3059             long long added_delay =
3060                 (newrank - server.cluster->failover_auth_rank) * 1000;
3061             server.cluster->failover_auth_time += added_delay;
3062             server.cluster->failover_auth_rank = newrank;
3063             serverLog(LL_WARNING,
3064                 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3065                 newrank, added_delay);
3066         }
3067     }
3068 
3069     /* Return ASAP if we can't still start the election. */
3070     if (mstime() < server.cluster->failover_auth_time) {
3071         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3072         return;
3073     }
3074 
3075     /* Return ASAP if the election is too old to be valid. */
3076     if (auth_age > auth_timeout) {
3077         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3078         return;
3079     }
3080 
3081     /* Ask for votes if needed. */
3082     if (server.cluster->failover_auth_sent == 0) {
3083         server.cluster->currentEpoch++;
3084         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3085         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3086             (unsigned long long) server.cluster->currentEpoch);
3087         clusterRequestFailoverAuth();
3088         server.cluster->failover_auth_sent = 1;
3089         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3090                              CLUSTER_TODO_UPDATE_STATE|
3091                              CLUSTER_TODO_FSYNC_CONFIG);
3092         return; /* Wait for replies. */
3093     }
3094 
3095     /* Check if we reached the quorum. */
3096     if (server.cluster->failover_auth_count >= needed_quorum) {
3097         /* We have the quorum, we can finally failover the master. */
3098 
3099         serverLog(LL_WARNING,
3100             "Failover election won: I'm the new master.");
3101 
3102         /* Update my configEpoch to the epoch of the election. */
3103         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3104             myself->configEpoch = server.cluster->failover_auth_epoch;
3105             serverLog(LL_WARNING,
3106                 "configEpoch set to %llu after successful failover",
3107                 (unsigned long long) myself->configEpoch);
3108         }
3109 
3110         /* Take responsibility for the cluster slots. */
3111         clusterFailoverReplaceYourMaster();
3112     } else {
3113         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3114     }
3115 }
3116 
3117 /* -----------------------------------------------------------------------------
3118  * CLUSTER slave migration
3119  *
3120  * Slave migration is the process that allows a slave of a master that is
3121  * already covered by at least another slave, to "migrate" to a master that
3122  * is orpaned, that is, left with no working slaves.
3123  * ------------------------------------------------------------------------- */
3124 
3125 /* This function is responsible to decide if this replica should be migrated
3126  * to a different (orphaned) master. It is called by the clusterCron() function
3127  * only if:
3128  *
3129  * 1) We are a slave node.
3130  * 2) It was detected that there is at least one orphaned master in
3131  *    the cluster.
3132  * 3) We are a slave of one of the masters with the greatest number of
3133  *    slaves.
3134  *
3135  * This checks are performed by the caller since it requires to iterate
3136  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3137  * if definitely needed.
3138  *
3139  * The fuction is called with a pre-computed max_slaves, that is the max
3140  * number of working (not in FAIL state) slaves for a single master.
3141  *
3142  * Additional conditions for migration are examined inside the function.
3143  */
clusterHandleSlaveMigration(int max_slaves)3144 void clusterHandleSlaveMigration(int max_slaves) {
3145     int j, okslaves = 0;
3146     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3147     dictIterator *di;
3148     dictEntry *de;
3149 
3150     /* Step 1: Don't migrate if the cluster state is not ok. */
3151     if (server.cluster->state != CLUSTER_OK) return;
3152 
3153     /* Step 2: Don't migrate if my master will not be left with at least
3154      *         'migration-barrier' slaves after my migration. */
3155     if (mymaster == NULL) return;
3156     for (j = 0; j < mymaster->numslaves; j++)
3157         if (!nodeFailed(mymaster->slaves[j]) &&
3158             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3159     if (okslaves <= server.cluster_migration_barrier) return;
3160 
3161     /* Step 3: Identify a candidate for migration, and check if among the
3162      * masters with the greatest number of ok slaves, I'm the one with the
3163      * smallest node ID (the "candidate slave").
3164      *
3165      * Note: this means that eventually a replica migration will occur
3166      * since slaves that are reachable again always have their FAIL flag
3167      * cleared, so eventually there must be a candidate. At the same time
3168      * this does not mean that there are no race conditions possible (two
3169      * slaves migrating at the same time), but this is unlikely to
3170      * happen, and harmless when happens. */
3171     candidate = myself;
3172     di = dictGetSafeIterator(server.cluster->nodes);
3173     while((de = dictNext(di)) != NULL) {
3174         clusterNode *node = dictGetVal(de);
3175         int okslaves = 0, is_orphaned = 1;
3176 
3177         /* We want to migrate only if this master is working, orphaned, and
3178          * used to have slaves or if failed over a master that had slaves
3179          * (MIGRATE_TO flag). This way we only migrate to instances that were
3180          * supposed to have replicas. */
3181         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3182         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3183 
3184         /* Check number of working slaves. */
3185         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3186         if (okslaves > 0) is_orphaned = 0;
3187 
3188         if (is_orphaned) {
3189             if (!target && node->numslots > 0) target = node;
3190 
3191             /* Track the starting time of the orphaned condition for this
3192              * master. */
3193             if (!node->orphaned_time) node->orphaned_time = mstime();
3194         } else {
3195             node->orphaned_time = 0;
3196         }
3197 
3198         /* Check if I'm the slave candidate for the migration: attached
3199          * to a master with the maximum number of slaves and with the smallest
3200          * node ID. */
3201         if (okslaves == max_slaves) {
3202             for (j = 0; j < node->numslaves; j++) {
3203                 if (memcmp(node->slaves[j]->name,
3204                            candidate->name,
3205                            CLUSTER_NAMELEN) < 0)
3206                 {
3207                     candidate = node->slaves[j];
3208                 }
3209             }
3210         }
3211     }
3212     dictReleaseIterator(di);
3213 
3214     /* Step 4: perform the migration if there is a target, and if I'm the
3215      * candidate, but only if the master is continuously orphaned for a
3216      * couple of seconds, so that during failovers, we give some time to
3217      * the natural slaves of this instance to advertise their switch from
3218      * the old master to the new one. */
3219     if (target && candidate == myself &&
3220         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3221        !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3222     {
3223         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3224             target->name);
3225         clusterSetMaster(target);
3226     }
3227 }
3228 
3229 /* -----------------------------------------------------------------------------
3230  * CLUSTER manual failover
3231  *
3232  * This are the important steps performed by slaves during a manual failover:
3233  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3234  *    setting mf_end to the millisecond unix time at which we'll abort the
3235  *    attempt.
3236  * 2) Slave sends a MFSTART message to the master requesting to pause clients
3237  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3238  *    When master is paused for manual failover, it also starts to flag
3239  *    packets with CLUSTERMSG_FLAG0_PAUSED.
3240  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3241  * 4) If slave received the offset from the master, and its offset matches,
3242  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3243  *    the failover as usually, with the difference that the vote request
3244  *    will be modified to force masters to vote for a slave that has a
3245  *    working master.
3246  *
3247  * From the point of view of the master things are simpler: when a
3248  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3249  * the sender in mf_slave. During the time limit for the manual failover
3250  * the master will just send PINGs more often to this slave, flagged with
3251  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3252  * a packet from the master with this flag set.
3253  *
3254  * The gaol of the manual failover is to perform a fast failover without
3255  * data loss due to the asynchronous master-slave replication.
3256  * -------------------------------------------------------------------------- */
3257 
3258 /* Reset the manual failover state. This works for both masters and slavesa
3259  * as all the state about manual failover is cleared.
3260  *
3261  * The function can be used both to initialize the manual failover state at
3262  * startup or to abort a manual failover in progress. */
resetManualFailover(void)3263 void resetManualFailover(void) {
3264     if (server.cluster->mf_end && clientsArePaused()) {
3265         server.clients_pause_end_time = 0;
3266         clientsArePaused(); /* Just use the side effect of the function. */
3267     }
3268     server.cluster->mf_end = 0; /* No manual failover in progress. */
3269     server.cluster->mf_can_start = 0;
3270     server.cluster->mf_slave = NULL;
3271     server.cluster->mf_master_offset = 0;
3272 }
3273 
3274 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3275 void manualFailoverCheckTimeout(void) {
3276     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3277         serverLog(LL_WARNING,"Manual failover timed out.");
3278         resetManualFailover();
3279     }
3280 }
3281 
3282 /* This function is called from the cluster cron function in order to go
3283  * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3284 void clusterHandleManualFailover(void) {
3285     /* Return ASAP if no manual failover is in progress. */
3286     if (server.cluster->mf_end == 0) return;
3287 
3288     /* If mf_can_start is non-zero, the failover was already triggered so the
3289      * next steps are performed by clusterHandleSlaveFailover(). */
3290     if (server.cluster->mf_can_start) return;
3291 
3292     if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3293 
3294     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3295         /* Our replication offset matches the master replication offset
3296          * announced after clients were paused. We can start the failover. */
3297         server.cluster->mf_can_start = 1;
3298         serverLog(LL_WARNING,
3299             "All master replication stream processed, "
3300             "manual failover can start.");
3301     }
3302 }
3303 
3304 /* -----------------------------------------------------------------------------
3305  * CLUSTER cron job
3306  * -------------------------------------------------------------------------- */
3307 
3308 /* This is executed 10 times every second */
clusterCron(void)3309 void clusterCron(void) {
3310     dictIterator *di;
3311     dictEntry *de;
3312     int update_state = 0;
3313     int orphaned_masters; /* How many masters there are without ok slaves. */
3314     int max_slaves; /* Max number of ok slaves for a single master. */
3315     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3316     mstime_t min_pong = 0, now = mstime();
3317     clusterNode *min_pong_node = NULL;
3318     static unsigned long long iteration = 0;
3319     mstime_t handshake_timeout;
3320 
3321     iteration++; /* Number of times this function was called so far. */
3322 
3323     /* We want to take myself->ip in sync with the cluster-announce-ip option.
3324      * The option can be set at runtime via CONFIG SET, so we periodically check
3325      * if the option changed to reflect this into myself->ip. */
3326     {
3327         static char *prev_ip = NULL;
3328         char *curr_ip = server.cluster_announce_ip;
3329         int changed = 0;
3330 
3331         if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3332         else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3333         else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3334 
3335         if (changed) {
3336             if (prev_ip) zfree(prev_ip);
3337             prev_ip = curr_ip;
3338 
3339             if (curr_ip) {
3340                 /* We always take a copy of the previous IP address, by
3341                  * duplicating the string. This way later we can check if
3342                  * the address really changed. */
3343                 prev_ip = zstrdup(prev_ip);
3344                 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3345                 myself->ip[NET_IP_STR_LEN-1] = '\0';
3346             } else {
3347                 myself->ip[0] = '\0'; /* Force autodetection. */
3348             }
3349         }
3350     }
3351 
3352     /* The handshake timeout is the time after which a handshake node that was
3353      * not turned into a normal node is removed from the nodes. Usually it is
3354      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3355      * the value of 1 second. */
3356     handshake_timeout = server.cluster_node_timeout;
3357     if (handshake_timeout < 1000) handshake_timeout = 1000;
3358 
3359     /* Update myself flags. */
3360     clusterUpdateMyselfFlags();
3361 
3362     /* Check if we have disconnected nodes and re-establish the connection.
3363      * Also update a few stats while we are here, that can be used to make
3364      * better decisions in other part of the code. */
3365     di = dictGetSafeIterator(server.cluster->nodes);
3366     server.cluster->stats_pfail_nodes = 0;
3367     while((de = dictNext(di)) != NULL) {
3368         clusterNode *node = dictGetVal(de);
3369 
3370         /* Not interested in reconnecting the link with myself or nodes
3371          * for which we have no address. */
3372         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3373 
3374         if (node->flags & CLUSTER_NODE_PFAIL)
3375             server.cluster->stats_pfail_nodes++;
3376 
3377         /* A Node in HANDSHAKE state has a limited lifespan equal to the
3378          * configured node timeout. */
3379         if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3380             clusterDelNode(node);
3381             continue;
3382         }
3383 
3384         if (node->link == NULL) {
3385             int fd;
3386             mstime_t old_ping_sent;
3387             clusterLink *link;
3388 
3389             fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3390                 node->cport, NET_FIRST_BIND_ADDR);
3391             if (fd == -1) {
3392                 /* We got a synchronous error from connect before
3393                  * clusterSendPing() had a chance to be called.
3394                  * If node->ping_sent is zero, failure detection can't work,
3395                  * so we claim we actually sent a ping now (that will
3396                  * be really sent as soon as the link is obtained). */
3397                 if (node->ping_sent == 0) node->ping_sent = mstime();
3398                 serverLog(LL_DEBUG, "Unable to connect to "
3399                     "Cluster Node [%s]:%d -> %s", node->ip,
3400                     node->cport, server.neterr);
3401                 continue;
3402             }
3403             link = createClusterLink(node);
3404             link->fd = fd;
3405             node->link = link;
3406             aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3407                     clusterReadHandler,link);
3408             /* Queue a PING in the new connection ASAP: this is crucial
3409              * to avoid false positives in failure detection.
3410              *
3411              * If the node is flagged as MEET, we send a MEET message instead
3412              * of a PING one, to force the receiver to add us in its node
3413              * table. */
3414             old_ping_sent = node->ping_sent;
3415             clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3416                     CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3417             if (old_ping_sent) {
3418                 /* If there was an active ping before the link was
3419                  * disconnected, we want to restore the ping time, otherwise
3420                  * replaced by the clusterSendPing() call. */
3421                 node->ping_sent = old_ping_sent;
3422             }
3423             /* We can clear the flag after the first packet is sent.
3424              * If we'll never receive a PONG, we'll never send new packets
3425              * to this node. Instead after the PONG is received and we
3426              * are no longer in meet/handshake status, we want to send
3427              * normal PING packets. */
3428             node->flags &= ~CLUSTER_NODE_MEET;
3429 
3430             serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3431                     node->name, node->ip, node->cport);
3432         }
3433     }
3434     dictReleaseIterator(di);
3435 
3436     /* Ping some random node 1 time every 10 iterations, so that we usually ping
3437      * one random node every second. */
3438     if (!(iteration % 10)) {
3439         int j;
3440 
3441         /* Check a few random nodes and ping the one with the oldest
3442          * pong_received time. */
3443         for (j = 0; j < 5; j++) {
3444             de = dictGetRandomKey(server.cluster->nodes);
3445             clusterNode *this = dictGetVal(de);
3446 
3447             /* Don't ping nodes disconnected or with a ping currently active. */
3448             if (this->link == NULL || this->ping_sent != 0) continue;
3449             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3450                 continue;
3451             if (min_pong_node == NULL || min_pong > this->pong_received) {
3452                 min_pong_node = this;
3453                 min_pong = this->pong_received;
3454             }
3455         }
3456         if (min_pong_node) {
3457             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3458             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3459         }
3460     }
3461 
3462     /* Iterate nodes to check if we need to flag something as failing.
3463      * This loop is also responsible to:
3464      * 1) Check if there are orphaned masters (masters without non failing
3465      *    slaves).
3466      * 2) Count the max number of non failing slaves for a single master.
3467      * 3) Count the number of slaves for our master, if we are a slave. */
3468     orphaned_masters = 0;
3469     max_slaves = 0;
3470     this_slaves = 0;
3471     di = dictGetSafeIterator(server.cluster->nodes);
3472     while((de = dictNext(di)) != NULL) {
3473         clusterNode *node = dictGetVal(de);
3474         now = mstime(); /* Use an updated time at every iteration. */
3475         mstime_t delay;
3476 
3477         if (node->flags &
3478             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3479                 continue;
3480 
3481         /* Orphaned master check, useful only if the current instance
3482          * is a slave that may migrate to another master. */
3483         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3484             int okslaves = clusterCountNonFailingSlaves(node);
3485 
3486             /* A master is orphaned if it is serving a non-zero number of
3487              * slots, have no working slaves, but used to have at least one
3488              * slave, or failed over a master that used to have slaves. */
3489             if (okslaves == 0 && node->numslots > 0 &&
3490                 node->flags & CLUSTER_NODE_MIGRATE_TO)
3491             {
3492                 orphaned_masters++;
3493             }
3494             if (okslaves > max_slaves) max_slaves = okslaves;
3495             if (nodeIsSlave(myself) && myself->slaveof == node)
3496                 this_slaves = okslaves;
3497         }
3498 
3499         /* If we are waiting for the PONG more than half the cluster
3500          * timeout, reconnect the link: maybe there is a connection
3501          * issue even if the node is alive. */
3502         if (node->link && /* is connected */
3503             now - node->link->ctime >
3504             server.cluster_node_timeout && /* was not already reconnected */
3505             node->ping_sent && /* we already sent a ping */
3506             node->pong_received < node->ping_sent && /* still waiting pong */
3507             /* and we are waiting for the pong more than timeout/2 */
3508             now - node->ping_sent > server.cluster_node_timeout/2)
3509         {
3510             /* Disconnect the link, it will be reconnected automatically. */
3511             freeClusterLink(node->link);
3512         }
3513 
3514         /* If we have currently no active ping in this instance, and the
3515          * received PONG is older than half the cluster timeout, send
3516          * a new ping now, to ensure all the nodes are pinged without
3517          * a too big delay. */
3518         if (node->link &&
3519             node->ping_sent == 0 &&
3520             (now - node->pong_received) > server.cluster_node_timeout/2)
3521         {
3522             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3523             continue;
3524         }
3525 
3526         /* If we are a master and one of the slaves requested a manual
3527          * failover, ping it continuously. */
3528         if (server.cluster->mf_end &&
3529             nodeIsMaster(myself) &&
3530             server.cluster->mf_slave == node &&
3531             node->link)
3532         {
3533             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3534             continue;
3535         }
3536 
3537         /* Check only if we have an active ping for this instance. */
3538         if (node->ping_sent == 0) continue;
3539 
3540         /* Compute the delay of the PONG. Note that if we already received
3541          * the PONG, then node->ping_sent is zero, so can't reach this
3542          * code at all. */
3543         delay = now - node->ping_sent;
3544 
3545         if (delay > server.cluster_node_timeout) {
3546             /* Timeout reached. Set the node as possibly failing if it is
3547              * not already in this state. */
3548             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3549                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3550                     node->name);
3551                 node->flags |= CLUSTER_NODE_PFAIL;
3552                 update_state = 1;
3553             }
3554         }
3555     }
3556     dictReleaseIterator(di);
3557 
3558     /* If we are a slave node but the replication is still turned off,
3559      * enable it if we know the address of our master and it appears to
3560      * be up. */
3561     if (nodeIsSlave(myself) &&
3562         server.masterhost == NULL &&
3563         myself->slaveof &&
3564         nodeHasAddr(myself->slaveof))
3565     {
3566         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3567     }
3568 
3569     /* Abourt a manual failover if the timeout is reached. */
3570     manualFailoverCheckTimeout();
3571 
3572     if (nodeIsSlave(myself)) {
3573         clusterHandleManualFailover();
3574         if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3575             clusterHandleSlaveFailover();
3576         /* If there are orphaned slaves, and we are a slave among the masters
3577          * with the max number of non-failing slaves, consider migrating to
3578          * the orphaned masters. Note that it does not make sense to try
3579          * a migration if there is no master with at least *two* working
3580          * slaves. */
3581         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3582             clusterHandleSlaveMigration(max_slaves);
3583     }
3584 
3585     if (update_state || server.cluster->state == CLUSTER_FAIL)
3586         clusterUpdateState();
3587 }
3588 
3589 /* This function is called before the event handler returns to sleep for
3590  * events. It is useful to perform operations that must be done ASAP in
3591  * reaction to events fired but that are not safe to perform inside event
3592  * handlers, or to perform potentially expansive tasks that we need to do
3593  * a single time before replying to clients. */
clusterBeforeSleep(void)3594 void clusterBeforeSleep(void) {
3595     /* Handle failover, this is needed when it is likely that there is already
3596      * the quorum from masters in order to react fast. */
3597     if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3598         clusterHandleSlaveFailover();
3599 
3600     /* Update the cluster state. */
3601     if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3602         clusterUpdateState();
3603 
3604     /* Save the config, possibly using fsync. */
3605     if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3606         int fsync = server.cluster->todo_before_sleep &
3607                     CLUSTER_TODO_FSYNC_CONFIG;
3608         clusterSaveConfigOrDie(fsync);
3609     }
3610 
3611     /* Reset our flags (not strictly needed since every single function
3612      * called for flags set should be able to clear its flag). */
3613     server.cluster->todo_before_sleep = 0;
3614 }
3615 
clusterDoBeforeSleep(int flags)3616 void clusterDoBeforeSleep(int flags) {
3617     server.cluster->todo_before_sleep |= flags;
3618 }
3619 
3620 /* -----------------------------------------------------------------------------
3621  * Slots management
3622  * -------------------------------------------------------------------------- */
3623 
3624 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3625  * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3626 int bitmapTestBit(unsigned char *bitmap, int pos) {
3627     off_t byte = pos/8;
3628     int bit = pos&7;
3629     return (bitmap[byte] & (1<<bit)) != 0;
3630 }
3631 
3632 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3633 void bitmapSetBit(unsigned char *bitmap, int pos) {
3634     off_t byte = pos/8;
3635     int bit = pos&7;
3636     bitmap[byte] |= 1<<bit;
3637 }
3638 
3639 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3640 void bitmapClearBit(unsigned char *bitmap, int pos) {
3641     off_t byte = pos/8;
3642     int bit = pos&7;
3643     bitmap[byte] &= ~(1<<bit);
3644 }
3645 
3646 /* Return non-zero if there is at least one master with slaves in the cluster.
3647  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3648  * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3649 int clusterMastersHaveSlaves(void) {
3650     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3651     dictEntry *de;
3652     int slaves = 0;
3653     while((de = dictNext(di)) != NULL) {
3654         clusterNode *node = dictGetVal(de);
3655 
3656         if (nodeIsSlave(node)) continue;
3657         slaves += node->numslaves;
3658     }
3659     dictReleaseIterator(di);
3660     return slaves != 0;
3661 }
3662 
3663 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3664 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3665     int old = bitmapTestBit(n->slots,slot);
3666     bitmapSetBit(n->slots,slot);
3667     if (!old) {
3668         n->numslots++;
3669         /* When a master gets its first slot, even if it has no slaves,
3670          * it gets flagged with MIGRATE_TO, that is, the master is a valid
3671          * target for replicas migration, if and only if at least one of
3672          * the other masters has slaves right now.
3673          *
3674          * Normally masters are valid targerts of replica migration if:
3675          * 1. The used to have slaves (but no longer have).
3676          * 2. They are slaves failing over a master that used to have slaves.
3677          *
3678          * However new masters with slots assigned are considered valid
3679          * migration tagets if the rest of the cluster is not a slave-less.
3680          *
3681          * See https://github.com/antirez/redis/issues/3043 for more info. */
3682         if (n->numslots == 1 && clusterMastersHaveSlaves())
3683             n->flags |= CLUSTER_NODE_MIGRATE_TO;
3684     }
3685     return old;
3686 }
3687 
3688 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3689 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3690     int old = bitmapTestBit(n->slots,slot);
3691     bitmapClearBit(n->slots,slot);
3692     if (old) n->numslots--;
3693     return old;
3694 }
3695 
3696 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3697 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3698     return bitmapTestBit(n->slots,slot);
3699 }
3700 
3701 /* Add the specified slot to the list of slots that node 'n' will
3702  * serve. Return C_OK if the operation ended with success.
3703  * If the slot is already assigned to another instance this is considered
3704  * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3705 int clusterAddSlot(clusterNode *n, int slot) {
3706     if (server.cluster->slots[slot]) return C_ERR;
3707     clusterNodeSetSlotBit(n,slot);
3708     server.cluster->slots[slot] = n;
3709     return C_OK;
3710 }
3711 
3712 /* Delete the specified slot marking it as unassigned.
3713  * Returns C_OK if the slot was assigned, otherwise if the slot was
3714  * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3715 int clusterDelSlot(int slot) {
3716     clusterNode *n = server.cluster->slots[slot];
3717 
3718     if (!n) return C_ERR;
3719     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3720     server.cluster->slots[slot] = NULL;
3721     return C_OK;
3722 }
3723 
3724 /* Delete all the slots associated with the specified node.
3725  * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3726 int clusterDelNodeSlots(clusterNode *node) {
3727     int deleted = 0, j;
3728 
3729     for (j = 0; j < CLUSTER_SLOTS; j++) {
3730         if (clusterNodeGetSlotBit(node,j)) {
3731             clusterDelSlot(j);
3732             deleted++;
3733         }
3734     }
3735     return deleted;
3736 }
3737 
3738 /* Clear the migrating / importing state for all the slots.
3739  * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3740 void clusterCloseAllSlots(void) {
3741     memset(server.cluster->migrating_slots_to,0,
3742         sizeof(server.cluster->migrating_slots_to));
3743     memset(server.cluster->importing_slots_from,0,
3744         sizeof(server.cluster->importing_slots_from));
3745 }
3746 
3747 /* -----------------------------------------------------------------------------
3748  * Cluster state evaluation function
3749  * -------------------------------------------------------------------------- */
3750 
3751 /* The following are defines that are only used in the evaluation function
3752  * and are based on heuristics. Actually the main point about the rejoin and
3753  * writable delay is that they should be a few orders of magnitude larger
3754  * than the network latency. */
3755 #define CLUSTER_MAX_REJOIN_DELAY 5000
3756 #define CLUSTER_MIN_REJOIN_DELAY 500
3757 #define CLUSTER_WRITABLE_DELAY 2000
3758 
clusterUpdateState(void)3759 void clusterUpdateState(void) {
3760     int j, new_state;
3761     int reachable_masters = 0;
3762     static mstime_t among_minority_time;
3763     static mstime_t first_call_time = 0;
3764 
3765     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3766 
3767     /* If this is a master node, wait some time before turning the state
3768      * into OK, since it is not a good idea to rejoin the cluster as a writable
3769      * master, after a reboot, without giving the cluster a chance to
3770      * reconfigure this node. Note that the delay is calculated starting from
3771      * the first call to this function and not since the server start, in order
3772      * to don't count the DB loading time. */
3773     if (first_call_time == 0) first_call_time = mstime();
3774     if (nodeIsMaster(myself) &&
3775         server.cluster->state == CLUSTER_FAIL &&
3776         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3777 
3778     /* Start assuming the state is OK. We'll turn it into FAIL if there
3779      * are the right conditions. */
3780     new_state = CLUSTER_OK;
3781 
3782     /* Check if all the slots are covered. */
3783     if (server.cluster_require_full_coverage) {
3784         for (j = 0; j < CLUSTER_SLOTS; j++) {
3785             if (server.cluster->slots[j] == NULL ||
3786                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3787             {
3788                 new_state = CLUSTER_FAIL;
3789                 break;
3790             }
3791         }
3792     }
3793 
3794     /* Compute the cluster size, that is the number of master nodes
3795      * serving at least a single slot.
3796      *
3797      * At the same time count the number of reachable masters having
3798      * at least one slot. */
3799     {
3800         dictIterator *di;
3801         dictEntry *de;
3802 
3803         server.cluster->size = 0;
3804         di = dictGetSafeIterator(server.cluster->nodes);
3805         while((de = dictNext(di)) != NULL) {
3806             clusterNode *node = dictGetVal(de);
3807 
3808             if (nodeIsMaster(node) && node->numslots) {
3809                 server.cluster->size++;
3810                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3811                     reachable_masters++;
3812             }
3813         }
3814         dictReleaseIterator(di);
3815     }
3816 
3817     /* If we are in a minority partition, change the cluster state
3818      * to FAIL. */
3819     {
3820         int needed_quorum = (server.cluster->size / 2) + 1;
3821 
3822         if (reachable_masters < needed_quorum) {
3823             new_state = CLUSTER_FAIL;
3824             among_minority_time = mstime();
3825         }
3826     }
3827 
3828     /* Log a state change */
3829     if (new_state != server.cluster->state) {
3830         mstime_t rejoin_delay = server.cluster_node_timeout;
3831 
3832         /* If the instance is a master and was partitioned away with the
3833          * minority, don't let it accept queries for some time after the
3834          * partition heals, to make sure there is enough time to receive
3835          * a configuration update. */
3836         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3837             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3838         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3839             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3840 
3841         if (new_state == CLUSTER_OK &&
3842             nodeIsMaster(myself) &&
3843             mstime() - among_minority_time < rejoin_delay)
3844         {
3845             return;
3846         }
3847 
3848         /* Change the state and log the event. */
3849         serverLog(LL_WARNING,"Cluster state changed: %s",
3850             new_state == CLUSTER_OK ? "ok" : "fail");
3851         server.cluster->state = new_state;
3852     }
3853 }
3854 
3855 /* This function is called after the node startup in order to verify that data
3856  * loaded from disk is in agreement with the cluster configuration:
3857  *
3858  * 1) If we find keys about hash slots we have no responsibility for, the
3859  *    following happens:
3860  *    A) If no other node is in charge according to the current cluster
3861  *       configuration, we add these slots to our node.
3862  *    B) If according to our config other nodes are already in charge for
3863  *       this lots, we set the slots as IMPORTING from our point of view
3864  *       in order to justify we have those slots, and in order to make
3865  *       redis-trib aware of the issue, so that it can try to fix it.
3866  * 2) If we find data in a DB different than DB0 we return C_ERR to
3867  *    signal the caller it should quit the server with an error message
3868  *    or take other actions.
3869  *
3870  * The function always returns C_OK even if it will try to correct
3871  * the error described in "1". However if data is found in DB different
3872  * from DB0, C_ERR is returned.
3873  *
3874  * The function also uses the logging facility in order to warn the user
3875  * about desynchronizations between the data we have in memory and the
3876  * cluster configuration. */
verifyClusterConfigWithData(void)3877 int verifyClusterConfigWithData(void) {
3878     int j;
3879     int update_config = 0;
3880 
3881     /* Return ASAP if a module disabled cluster redirections. In that case
3882      * every master can store keys about every possible hash slot. */
3883     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
3884         return C_OK;
3885 
3886     /* If this node is a slave, don't perform the check at all as we
3887      * completely depend on the replication stream. */
3888     if (nodeIsSlave(myself)) return C_OK;
3889 
3890     /* Make sure we only have keys in DB0. */
3891     for (j = 1; j < server.dbnum; j++) {
3892         if (dictSize(server.db[j].dict)) return C_ERR;
3893     }
3894 
3895     /* Check that all the slots we see populated memory have a corresponding
3896      * entry in the cluster table. Otherwise fix the table. */
3897     for (j = 0; j < CLUSTER_SLOTS; j++) {
3898         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3899         /* Check if we are assigned to this slot or if we are importing it.
3900          * In both cases check the next slot as the configuration makes
3901          * sense. */
3902         if (server.cluster->slots[j] == myself ||
3903             server.cluster->importing_slots_from[j] != NULL) continue;
3904 
3905         /* If we are here data and cluster config don't agree, and we have
3906          * slot 'j' populated even if we are not importing it, nor we are
3907          * assigned to this slot. Fix this condition. */
3908 
3909         update_config++;
3910         /* Case A: slot is unassigned. Take responsibility for it. */
3911         if (server.cluster->slots[j] == NULL) {
3912             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3913                                     "Taking responsibility for it.",j);
3914             clusterAddSlot(myself,j);
3915         } else {
3916             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3917                                     "assigned to another node. "
3918                                     "Setting it to importing state.",j);
3919             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3920         }
3921     }
3922     if (update_config) clusterSaveConfigOrDie(1);
3923     return C_OK;
3924 }
3925 
3926 /* -----------------------------------------------------------------------------
3927  * SLAVE nodes handling
3928  * -------------------------------------------------------------------------- */
3929 
3930 /* Set the specified node 'n' as master for this node.
3931  * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3932 void clusterSetMaster(clusterNode *n) {
3933     serverAssert(n != myself);
3934     serverAssert(myself->numslots == 0);
3935 
3936     if (nodeIsMaster(myself)) {
3937         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3938         myself->flags |= CLUSTER_NODE_SLAVE;
3939         clusterCloseAllSlots();
3940     } else {
3941         if (myself->slaveof)
3942             clusterNodeRemoveSlave(myself->slaveof,myself);
3943     }
3944     myself->slaveof = n;
3945     clusterNodeAddSlave(n,myself);
3946     replicationSetMaster(n->ip, n->port);
3947     resetManualFailover();
3948 }
3949 
3950 /* -----------------------------------------------------------------------------
3951  * Nodes to string representation functions.
3952  * -------------------------------------------------------------------------- */
3953 
3954 struct redisNodeFlags {
3955     uint16_t flag;
3956     char *name;
3957 };
3958 
3959 static struct redisNodeFlags redisNodeFlagsTable[] = {
3960     {CLUSTER_NODE_MYSELF,       "myself,"},
3961     {CLUSTER_NODE_MASTER,       "master,"},
3962     {CLUSTER_NODE_SLAVE,        "slave,"},
3963     {CLUSTER_NODE_PFAIL,        "fail?,"},
3964     {CLUSTER_NODE_FAIL,         "fail,"},
3965     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
3966     {CLUSTER_NODE_NOADDR,       "noaddr,"},
3967     {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
3968 };
3969 
3970 /* Concatenate the comma separated list of node flags to the given SDS
3971  * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)3972 sds representClusterNodeFlags(sds ci, uint16_t flags) {
3973     size_t orig_len = sdslen(ci);
3974     int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
3975     for (i = 0; i < size; i++) {
3976         struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
3977         if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
3978     }
3979     /* If no flag was added, add the "noflags" special flag. */
3980     if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
3981     sdsIncrLen(ci,-1); /* Remove trailing comma. */
3982     return ci;
3983 }
3984 
3985 /* Generate a csv-alike representation of the specified cluster node.
3986  * See clusterGenNodesDescription() top comment for more information.
3987  *
3988  * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)3989 sds clusterGenNodeDescription(clusterNode *node) {
3990     int j, start;
3991     sds ci;
3992 
3993     /* Node coordinates */
3994     ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
3995         node->name,
3996         node->ip,
3997         node->port,
3998         node->cport);
3999 
4000     /* Flags */
4001     ci = representClusterNodeFlags(ci, node->flags);
4002 
4003     /* Slave of... or just "-" */
4004     if (node->slaveof)
4005         ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4006     else
4007         ci = sdscatlen(ci," - ",3);
4008 
4009     /* Latency from the POV of this node, config epoch, link status */
4010     ci = sdscatprintf(ci,"%lld %lld %llu %s",
4011         (long long) node->ping_sent,
4012         (long long) node->pong_received,
4013         (unsigned long long) node->configEpoch,
4014         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4015                     "connected" : "disconnected");
4016 
4017     /* Slots served by this instance */
4018     start = -1;
4019     for (j = 0; j < CLUSTER_SLOTS; j++) {
4020         int bit;
4021 
4022         if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4023             if (start == -1) start = j;
4024         }
4025         if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4026             if (bit && j == CLUSTER_SLOTS-1) j++;
4027 
4028             if (start == j-1) {
4029                 ci = sdscatprintf(ci," %d",start);
4030             } else {
4031                 ci = sdscatprintf(ci," %d-%d",start,j-1);
4032             }
4033             start = -1;
4034         }
4035     }
4036 
4037     /* Just for MYSELF node we also dump info about slots that
4038      * we are migrating to other instances or importing from other
4039      * instances. */
4040     if (node->flags & CLUSTER_NODE_MYSELF) {
4041         for (j = 0; j < CLUSTER_SLOTS; j++) {
4042             if (server.cluster->migrating_slots_to[j]) {
4043                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4044                     server.cluster->migrating_slots_to[j]->name);
4045             } else if (server.cluster->importing_slots_from[j]) {
4046                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4047                     server.cluster->importing_slots_from[j]->name);
4048             }
4049         }
4050     }
4051     return ci;
4052 }
4053 
4054 /* Generate a csv-alike representation of the nodes we are aware of,
4055  * including the "myself" node, and return an SDS string containing the
4056  * representation (it is up to the caller to free it).
4057  *
4058  * All the nodes matching at least one of the node flags specified in
4059  * "filter" are excluded from the output, so using zero as a filter will
4060  * include all the known nodes in the representation, including nodes in
4061  * the HANDSHAKE state.
4062  *
4063  * The representation obtained using this function is used for the output
4064  * of the CLUSTER NODES function, and as format for the cluster
4065  * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4066 sds clusterGenNodesDescription(int filter) {
4067     sds ci = sdsempty(), ni;
4068     dictIterator *di;
4069     dictEntry *de;
4070 
4071     di = dictGetSafeIterator(server.cluster->nodes);
4072     while((de = dictNext(di)) != NULL) {
4073         clusterNode *node = dictGetVal(de);
4074 
4075         if (node->flags & filter) continue;
4076         ni = clusterGenNodeDescription(node);
4077         ci = sdscatsds(ci,ni);
4078         sdsfree(ni);
4079         ci = sdscatlen(ci,"\n",1);
4080     }
4081     dictReleaseIterator(di);
4082     return ci;
4083 }
4084 
4085 /* -----------------------------------------------------------------------------
4086  * CLUSTER command
4087  * -------------------------------------------------------------------------- */
4088 
clusterGetMessageTypeString(int type)4089 const char *clusterGetMessageTypeString(int type) {
4090     switch(type) {
4091     case CLUSTERMSG_TYPE_PING: return "ping";
4092     case CLUSTERMSG_TYPE_PONG: return "pong";
4093     case CLUSTERMSG_TYPE_MEET: return "meet";
4094     case CLUSTERMSG_TYPE_FAIL: return "fail";
4095     case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4096     case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4097     case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4098     case CLUSTERMSG_TYPE_UPDATE: return "update";
4099     case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4100     case CLUSTERMSG_TYPE_MODULE: return "module";
4101     }
4102     return "unknown";
4103 }
4104 
getSlotOrReply(client * c,robj * o)4105 int getSlotOrReply(client *c, robj *o) {
4106     long long slot;
4107 
4108     if (getLongLongFromObject(o,&slot) != C_OK ||
4109         slot < 0 || slot >= CLUSTER_SLOTS)
4110     {
4111         addReplyError(c,"Invalid or out of range slot");
4112         return -1;
4113     }
4114     return (int) slot;
4115 }
4116 
clusterReplyMultiBulkSlots(client * c)4117 void clusterReplyMultiBulkSlots(client *c) {
4118     /* Format: 1) 1) start slot
4119      *            2) end slot
4120      *            3) 1) master IP
4121      *               2) master port
4122      *               3) node ID
4123      *            4) 1) replica IP
4124      *               2) replica port
4125      *               3) node ID
4126      *           ... continued until done
4127      */
4128 
4129     int num_masters = 0;
4130     void *slot_replylen = addDeferredMultiBulkLength(c);
4131 
4132     dictEntry *de;
4133     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4134     while((de = dictNext(di)) != NULL) {
4135         clusterNode *node = dictGetVal(de);
4136         int j = 0, start = -1;
4137 
4138         /* Skip slaves (that are iterated when producing the output of their
4139          * master) and  masters not serving any slot. */
4140         if (!nodeIsMaster(node) || node->numslots == 0) continue;
4141 
4142         for (j = 0; j < CLUSTER_SLOTS; j++) {
4143             int bit, i;
4144 
4145             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4146                 if (start == -1) start = j;
4147             }
4148             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4149                 int nested_elements = 3; /* slots (2) + master addr (1). */
4150                 void *nested_replylen = addDeferredMultiBulkLength(c);
4151 
4152                 if (bit && j == CLUSTER_SLOTS-1) j++;
4153 
4154                 /* If slot exists in output map, add to it's list.
4155                  * else, create a new output map for this slot */
4156                 if (start == j-1) {
4157                     addReplyLongLong(c, start); /* only one slot; low==high */
4158                     addReplyLongLong(c, start);
4159                 } else {
4160                     addReplyLongLong(c, start); /* low */
4161                     addReplyLongLong(c, j-1);   /* high */
4162                 }
4163                 start = -1;
4164 
4165                 /* First node reply position is always the master */
4166                 addReplyMultiBulkLen(c, 3);
4167                 addReplyBulkCString(c, node->ip);
4168                 addReplyLongLong(c, node->port);
4169                 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4170 
4171                 /* Remaining nodes in reply are replicas for slot range */
4172                 for (i = 0; i < node->numslaves; i++) {
4173                     /* This loop is copy/pasted from clusterGenNodeDescription()
4174                      * with modifications for per-slot node aggregation */
4175                     if (nodeFailed(node->slaves[i])) continue;
4176                     addReplyMultiBulkLen(c, 3);
4177                     addReplyBulkCString(c, node->slaves[i]->ip);
4178                     addReplyLongLong(c, node->slaves[i]->port);
4179                     addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4180                     nested_elements++;
4181                 }
4182                 setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
4183                 num_masters++;
4184             }
4185         }
4186     }
4187     dictReleaseIterator(di);
4188     setDeferredMultiBulkLength(c, slot_replylen, num_masters);
4189 }
4190 
clusterCommand(client * c)4191 void clusterCommand(client *c) {
4192     if (server.cluster_enabled == 0) {
4193         addReplyError(c,"This instance has cluster support disabled");
4194         return;
4195     }
4196 
4197     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4198         const char *help[] = {
4199 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4200 "BUMPEPOCH -- Advance the cluster config epoch.",
4201 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4202 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4203 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4204 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4205 "FORGET <node-id> -- Remove a node from the cluster.",
4206 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4207 "FLUSHSLOTS -- Delete current node own slots information.",
4208 "INFO - Return onformation about the cluster.",
4209 "KEYSLOT <key> -- Return the hash slot for <key>.",
4210 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4211 "MYID -- Return the node id.",
4212 "NODES -- Return cluster configuration seen by node. Output format:",
4213 "    <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4214 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4215 "RESET [hard|soft] -- Reset current node (default: soft).",
4216 "SET-config-epoch <epoch> - Set config epoch of current node.",
4217 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4218 "REPLICAS <node-id> -- Return <node-id> replicas.",
4219 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4220 "    start, end, master and replicas IP addresses, ports and ids",
4221 NULL
4222         };
4223         addReplyHelp(c, help);
4224     } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4225         /* CLUSTER MEET <ip> <port> [cport] */
4226         long long port, cport;
4227 
4228         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4229             addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4230                                 (char*)c->argv[3]->ptr);
4231             return;
4232         }
4233 
4234         if (c->argc == 5) {
4235             if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4236                 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4237                                     (char*)c->argv[4]->ptr);
4238                 return;
4239             }
4240         } else {
4241             cport = port + CLUSTER_PORT_INCR;
4242         }
4243 
4244         if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4245             errno == EINVAL)
4246         {
4247             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4248                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4249         } else {
4250             addReply(c,shared.ok);
4251         }
4252     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4253         /* CLUSTER NODES */
4254         robj *o;
4255         sds ci = clusterGenNodesDescription(0);
4256 
4257         o = createObject(OBJ_STRING,ci);
4258         addReplyBulk(c,o);
4259         decrRefCount(o);
4260     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4261         /* CLUSTER MYID */
4262         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4263     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4264         /* CLUSTER SLOTS */
4265         clusterReplyMultiBulkSlots(c);
4266     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4267         /* CLUSTER FLUSHSLOTS */
4268         if (dictSize(server.db[0].dict) != 0) {
4269             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4270             return;
4271         }
4272         clusterDelNodeSlots(myself);
4273         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4274         addReply(c,shared.ok);
4275     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4276                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4277     {
4278         /* CLUSTER ADDSLOTS <slot> [slot] ... */
4279         /* CLUSTER DELSLOTS <slot> [slot] ... */
4280         int j, slot;
4281         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4282         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4283 
4284         memset(slots,0,CLUSTER_SLOTS);
4285         /* Check that all the arguments are parseable and that all the
4286          * slots are not already busy. */
4287         for (j = 2; j < c->argc; j++) {
4288             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4289                 zfree(slots);
4290                 return;
4291             }
4292             if (del && server.cluster->slots[slot] == NULL) {
4293                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4294                 zfree(slots);
4295                 return;
4296             } else if (!del && server.cluster->slots[slot]) {
4297                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4298                 zfree(slots);
4299                 return;
4300             }
4301             if (slots[slot]++ == 1) {
4302                 addReplyErrorFormat(c,"Slot %d specified multiple times",
4303                     (int)slot);
4304                 zfree(slots);
4305                 return;
4306             }
4307         }
4308         for (j = 0; j < CLUSTER_SLOTS; j++) {
4309             if (slots[j]) {
4310                 int retval;
4311 
4312                 /* If this slot was set as importing we can clear this
4313                  * state as now we are the real owner of the slot. */
4314                 if (server.cluster->importing_slots_from[j])
4315                     server.cluster->importing_slots_from[j] = NULL;
4316 
4317                 retval = del ? clusterDelSlot(j) :
4318                                clusterAddSlot(myself,j);
4319                 serverAssertWithInfo(c,NULL,retval == C_OK);
4320             }
4321         }
4322         zfree(slots);
4323         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4324         addReply(c,shared.ok);
4325     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4326         /* SETSLOT 10 MIGRATING <node ID> */
4327         /* SETSLOT 10 IMPORTING <node ID> */
4328         /* SETSLOT 10 STABLE */
4329         /* SETSLOT 10 NODE <node ID> */
4330         int slot;
4331         clusterNode *n;
4332 
4333         if (nodeIsSlave(myself)) {
4334             addReplyError(c,"Please use SETSLOT only with masters.");
4335             return;
4336         }
4337 
4338         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4339 
4340         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4341             if (server.cluster->slots[slot] != myself) {
4342                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4343                 return;
4344             }
4345             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4346                 addReplyErrorFormat(c,"I don't know about node %s",
4347                     (char*)c->argv[4]->ptr);
4348                 return;
4349             }
4350             server.cluster->migrating_slots_to[slot] = n;
4351         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4352             if (server.cluster->slots[slot] == myself) {
4353                 addReplyErrorFormat(c,
4354                     "I'm already the owner of hash slot %u",slot);
4355                 return;
4356             }
4357             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4358                 addReplyErrorFormat(c,"I don't know about node %s",
4359                     (char*)c->argv[4]->ptr);
4360                 return;
4361             }
4362             server.cluster->importing_slots_from[slot] = n;
4363         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4364             /* CLUSTER SETSLOT <SLOT> STABLE */
4365             server.cluster->importing_slots_from[slot] = NULL;
4366             server.cluster->migrating_slots_to[slot] = NULL;
4367         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4368             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4369             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4370 
4371             if (!n) {
4372                 addReplyErrorFormat(c,"Unknown node %s",
4373                     (char*)c->argv[4]->ptr);
4374                 return;
4375             }
4376             /* If this hash slot was served by 'myself' before to switch
4377              * make sure there are no longer local keys for this hash slot. */
4378             if (server.cluster->slots[slot] == myself && n != myself) {
4379                 if (countKeysInSlot(slot) != 0) {
4380                     addReplyErrorFormat(c,
4381                         "Can't assign hashslot %d to a different node "
4382                         "while I still hold keys for this hash slot.", slot);
4383                     return;
4384                 }
4385             }
4386             /* If this slot is in migrating status but we have no keys
4387              * for it assigning the slot to another node will clear
4388              * the migratig status. */
4389             if (countKeysInSlot(slot) == 0 &&
4390                 server.cluster->migrating_slots_to[slot])
4391                 server.cluster->migrating_slots_to[slot] = NULL;
4392 
4393             /* If this node was importing this slot, assigning the slot to
4394              * itself also clears the importing status. */
4395             if (n == myself &&
4396                 server.cluster->importing_slots_from[slot])
4397             {
4398                 /* This slot was manually migrated, set this node configEpoch
4399                  * to a new epoch so that the new version can be propagated
4400                  * by the cluster.
4401                  *
4402                  * Note that if this ever results in a collision with another
4403                  * node getting the same configEpoch, for example because a
4404                  * failover happens at the same time we close the slot, the
4405                  * configEpoch collision resolution will fix it assigning
4406                  * a different epoch to each node. */
4407                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4408                     serverLog(LL_WARNING,
4409                         "configEpoch updated after importing slot %d", slot);
4410                 }
4411                 server.cluster->importing_slots_from[slot] = NULL;
4412             }
4413             clusterDelSlot(slot);
4414             clusterAddSlot(n,slot);
4415         } else {
4416             addReplyError(c,
4417                 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4418             return;
4419         }
4420         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4421         addReply(c,shared.ok);
4422     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4423         /* CLUSTER BUMPEPOCH */
4424         int retval = clusterBumpConfigEpochWithoutConsensus();
4425         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4426                 (retval == C_OK) ? "BUMPED" : "STILL",
4427                 (unsigned long long) myself->configEpoch);
4428         addReplySds(c,reply);
4429     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4430         /* CLUSTER INFO */
4431         char *statestr[] = {"ok","fail","needhelp"};
4432         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4433         uint64_t myepoch;
4434         int j;
4435 
4436         for (j = 0; j < CLUSTER_SLOTS; j++) {
4437             clusterNode *n = server.cluster->slots[j];
4438 
4439             if (n == NULL) continue;
4440             slots_assigned++;
4441             if (nodeFailed(n)) {
4442                 slots_fail++;
4443             } else if (nodeTimedOut(n)) {
4444                 slots_pfail++;
4445             } else {
4446                 slots_ok++;
4447             }
4448         }
4449 
4450         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4451                   myself->slaveof->configEpoch : myself->configEpoch;
4452 
4453         sds info = sdscatprintf(sdsempty(),
4454             "cluster_state:%s\r\n"
4455             "cluster_slots_assigned:%d\r\n"
4456             "cluster_slots_ok:%d\r\n"
4457             "cluster_slots_pfail:%d\r\n"
4458             "cluster_slots_fail:%d\r\n"
4459             "cluster_known_nodes:%lu\r\n"
4460             "cluster_size:%d\r\n"
4461             "cluster_current_epoch:%llu\r\n"
4462             "cluster_my_epoch:%llu\r\n"
4463             , statestr[server.cluster->state],
4464             slots_assigned,
4465             slots_ok,
4466             slots_pfail,
4467             slots_fail,
4468             dictSize(server.cluster->nodes),
4469             server.cluster->size,
4470             (unsigned long long) server.cluster->currentEpoch,
4471             (unsigned long long) myepoch
4472         );
4473 
4474         /* Show stats about messages sent and received. */
4475         long long tot_msg_sent = 0;
4476         long long tot_msg_received = 0;
4477 
4478         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4479             if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4480             tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4481             info = sdscatprintf(info,
4482                 "cluster_stats_messages_%s_sent:%lld\r\n",
4483                 clusterGetMessageTypeString(i),
4484                 server.cluster->stats_bus_messages_sent[i]);
4485         }
4486         info = sdscatprintf(info,
4487             "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4488 
4489         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4490             if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4491             tot_msg_received += server.cluster->stats_bus_messages_received[i];
4492             info = sdscatprintf(info,
4493                 "cluster_stats_messages_%s_received:%lld\r\n",
4494                 clusterGetMessageTypeString(i),
4495                 server.cluster->stats_bus_messages_received[i]);
4496         }
4497         info = sdscatprintf(info,
4498             "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4499 
4500         /* Produce the reply protocol. */
4501         addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4502             (unsigned long)sdslen(info)));
4503         addReplySds(c,info);
4504         addReply(c,shared.crlf);
4505     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4506         int retval = clusterSaveConfig(1);
4507 
4508         if (retval == 0)
4509             addReply(c,shared.ok);
4510         else
4511             addReplyErrorFormat(c,"error saving the cluster node config: %s",
4512                 strerror(errno));
4513     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4514         /* CLUSTER KEYSLOT <key> */
4515         sds key = c->argv[2]->ptr;
4516 
4517         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4518     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4519         /* CLUSTER COUNTKEYSINSLOT <slot> */
4520         long long slot;
4521 
4522         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4523             return;
4524         if (slot < 0 || slot >= CLUSTER_SLOTS) {
4525             addReplyError(c,"Invalid slot");
4526             return;
4527         }
4528         addReplyLongLong(c,countKeysInSlot(slot));
4529     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4530         /* CLUSTER GETKEYSINSLOT <slot> <count> */
4531         long long maxkeys, slot;
4532         unsigned int numkeys, j;
4533         robj **keys;
4534 
4535         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4536             return;
4537         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4538             != C_OK)
4539             return;
4540         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4541             addReplyError(c,"Invalid slot or number of keys");
4542             return;
4543         }
4544 
4545         /* Avoid allocating more than needed in case of large COUNT argument
4546          * and smaller actual number of keys. */
4547         unsigned int keys_in_slot = countKeysInSlot(slot);
4548         if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4549 
4550         keys = zmalloc(sizeof(robj*)*maxkeys);
4551         numkeys = getKeysInSlot(slot, keys, maxkeys);
4552         addReplyMultiBulkLen(c,numkeys);
4553         for (j = 0; j < numkeys; j++) {
4554             addReplyBulk(c,keys[j]);
4555             decrRefCount(keys[j]);
4556         }
4557         zfree(keys);
4558     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4559         /* CLUSTER FORGET <NODE ID> */
4560         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4561 
4562         if (!n) {
4563             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4564             return;
4565         } else if (n == myself) {
4566             addReplyError(c,"I tried hard but I can't forget myself...");
4567             return;
4568         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4569             addReplyError(c,"Can't forget my master!");
4570             return;
4571         }
4572         clusterBlacklistAddNode(n);
4573         clusterDelNode(n);
4574         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4575                              CLUSTER_TODO_SAVE_CONFIG);
4576         addReply(c,shared.ok);
4577     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4578         /* CLUSTER REPLICATE <NODE ID> */
4579         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4580 
4581         /* Lookup the specified node in our table. */
4582         if (!n) {
4583             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4584             return;
4585         }
4586 
4587         /* I can't replicate myself. */
4588         if (n == myself) {
4589             addReplyError(c,"Can't replicate myself");
4590             return;
4591         }
4592 
4593         /* Can't replicate a slave. */
4594         if (nodeIsSlave(n)) {
4595             addReplyError(c,"I can only replicate a master, not a replica.");
4596             return;
4597         }
4598 
4599         /* If the instance is currently a master, it should have no assigned
4600          * slots nor keys to accept to replicate some other node.
4601          * Slaves can switch to another master without issues. */
4602         if (nodeIsMaster(myself) &&
4603             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4604             addReplyError(c,
4605                 "To set a master the node must be empty and "
4606                 "without assigned slots.");
4607             return;
4608         }
4609 
4610         /* Set the master. */
4611         clusterSetMaster(n);
4612         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4613         addReply(c,shared.ok);
4614     } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4615                 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4616         /* CLUSTER SLAVES <NODE ID> */
4617         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4618         int j;
4619 
4620         /* Lookup the specified node in our table. */
4621         if (!n) {
4622             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4623             return;
4624         }
4625 
4626         if (nodeIsSlave(n)) {
4627             addReplyError(c,"The specified node is not a master");
4628             return;
4629         }
4630 
4631         addReplyMultiBulkLen(c,n->numslaves);
4632         for (j = 0; j < n->numslaves; j++) {
4633             sds ni = clusterGenNodeDescription(n->slaves[j]);
4634             addReplyBulkCString(c,ni);
4635             sdsfree(ni);
4636         }
4637     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4638                c->argc == 3)
4639     {
4640         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4641         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4642 
4643         if (!n) {
4644             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4645             return;
4646         } else {
4647             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4648         }
4649     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4650                (c->argc == 2 || c->argc == 3))
4651     {
4652         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4653         int force = 0, takeover = 0;
4654 
4655         if (c->argc == 3) {
4656             if (!strcasecmp(c->argv[2]->ptr,"force")) {
4657                 force = 1;
4658             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4659                 takeover = 1;
4660                 force = 1; /* Takeover also implies force. */
4661             } else {
4662                 addReply(c,shared.syntaxerr);
4663                 return;
4664             }
4665         }
4666 
4667         /* Check preconditions. */
4668         if (nodeIsMaster(myself)) {
4669             addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4670             return;
4671         } else if (myself->slaveof == NULL) {
4672             addReplyError(c,"I'm a replica but my master is unknown to me");
4673             return;
4674         } else if (!force &&
4675                    (nodeFailed(myself->slaveof) ||
4676                     myself->slaveof->link == NULL))
4677         {
4678             addReplyError(c,"Master is down or failed, "
4679                             "please use CLUSTER FAILOVER FORCE");
4680             return;
4681         }
4682         resetManualFailover();
4683         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4684 
4685         if (takeover) {
4686             /* A takeover does not perform any initial check. It just
4687              * generates a new configuration epoch for this node without
4688              * consensus, claims the master's slots, and broadcast the new
4689              * configuration. */
4690             serverLog(LL_WARNING,"Taking over the master (user request).");
4691             clusterBumpConfigEpochWithoutConsensus();
4692             clusterFailoverReplaceYourMaster();
4693         } else if (force) {
4694             /* If this is a forced failover, we don't need to talk with our
4695              * master to agree about the offset. We just failover taking over
4696              * it without coordination. */
4697             serverLog(LL_WARNING,"Forced failover user request accepted.");
4698             server.cluster->mf_can_start = 1;
4699         } else {
4700             serverLog(LL_WARNING,"Manual failover user request accepted.");
4701             clusterSendMFStart(myself->slaveof);
4702         }
4703         addReply(c,shared.ok);
4704     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4705     {
4706         /* CLUSTER SET-CONFIG-EPOCH <epoch>
4707          *
4708          * The user is allowed to set the config epoch only when a node is
4709          * totally fresh: no config epoch, no other known node, and so forth.
4710          * This happens at cluster creation time to start with a cluster where
4711          * every node has a different node ID, without to rely on the conflicts
4712          * resolution system which is too slow when a big cluster is created. */
4713         long long epoch;
4714 
4715         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4716             return;
4717 
4718         if (epoch < 0) {
4719             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4720         } else if (dictSize(server.cluster->nodes) > 1) {
4721             addReplyError(c,"The user can assign a config epoch only when the "
4722                             "node does not know any other node.");
4723         } else if (myself->configEpoch != 0) {
4724             addReplyError(c,"Node config epoch is already non-zero");
4725         } else {
4726             myself->configEpoch = epoch;
4727             serverLog(LL_WARNING,
4728                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4729                 (unsigned long long) myself->configEpoch);
4730 
4731             if (server.cluster->currentEpoch < (uint64_t)epoch)
4732                 server.cluster->currentEpoch = epoch;
4733             /* No need to fsync the config here since in the unlucky event
4734              * of a failure to persist the config, the conflict resolution code
4735              * will assign an unique config to this node. */
4736             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4737                                  CLUSTER_TODO_SAVE_CONFIG);
4738             addReply(c,shared.ok);
4739         }
4740     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4741                (c->argc == 2 || c->argc == 3))
4742     {
4743         /* CLUSTER RESET [SOFT|HARD] */
4744         int hard = 0;
4745 
4746         /* Parse soft/hard argument. Default is soft. */
4747         if (c->argc == 3) {
4748             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4749                 hard = 1;
4750             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4751                 hard = 0;
4752             } else {
4753                 addReply(c,shared.syntaxerr);
4754                 return;
4755             }
4756         }
4757 
4758         /* Slaves can be reset while containing data, but not master nodes
4759          * that must be empty. */
4760         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4761             addReplyError(c,"CLUSTER RESET can't be called with "
4762                             "master nodes containing keys");
4763             return;
4764         }
4765         clusterReset(hard);
4766         addReply(c,shared.ok);
4767     } else {
4768         addReplySubcommandSyntaxError(c);
4769         return;
4770     }
4771 }
4772 
4773 /* -----------------------------------------------------------------------------
4774  * DUMP, RESTORE and MIGRATE commands
4775  * -------------------------------------------------------------------------- */
4776 
4777 /* Generates a DUMP-format representation of the object 'o', adding it to the
4778  * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4779 void createDumpPayload(rio *payload, robj *o, robj *key) {
4780     unsigned char buf[2];
4781     uint64_t crc;
4782 
4783     /* Serialize the object in a RDB-like format. It consist of an object type
4784      * byte followed by the serialized object. This is understood by RESTORE. */
4785     rioInitWithBuffer(payload,sdsempty());
4786     serverAssert(rdbSaveObjectType(payload,o));
4787     serverAssert(rdbSaveObject(payload,o,key));
4788 
4789     /* Write the footer, this is how it looks like:
4790      * ----------------+---------------------+---------------+
4791      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4792      * ----------------+---------------------+---------------+
4793      * RDB version and CRC are both in little endian.
4794      */
4795 
4796     /* RDB version */
4797     buf[0] = RDB_VERSION & 0xff;
4798     buf[1] = (RDB_VERSION >> 8) & 0xff;
4799     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4800 
4801     /* CRC64 */
4802     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4803                 sdslen(payload->io.buffer.ptr));
4804     memrev64ifbe(&crc);
4805     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4806 }
4807 
4808 /* Verify that the RDB version of the dump payload matches the one of this Redis
4809  * instance and that the checksum is ok.
4810  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4811  * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4812 int verifyDumpPayload(unsigned char *p, size_t len) {
4813     unsigned char *footer;
4814     uint16_t rdbver;
4815     uint64_t crc;
4816 
4817     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4818     if (len < 10) return C_ERR;
4819     footer = p+(len-10);
4820 
4821     /* Verify RDB version */
4822     rdbver = (footer[1] << 8) | footer[0];
4823     if (rdbver > RDB_VERSION) return C_ERR;
4824 
4825     /* Verify CRC64 */
4826     crc = crc64(0,p,len-8);
4827     memrev64ifbe(&crc);
4828     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4829 }
4830 
4831 /* DUMP keyname
4832  * DUMP is actually not used by Redis Cluster but it is the obvious
4833  * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4834 void dumpCommand(client *c) {
4835     robj *o, *dumpobj;
4836     rio payload;
4837 
4838     /* Check if the key is here. */
4839     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4840         addReply(c,shared.nullbulk);
4841         return;
4842     }
4843 
4844     /* Create the DUMP encoded representation. */
4845     createDumpPayload(&payload,o,c->argv[1]);
4846 
4847     /* Transfer to the client */
4848     dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4849     addReplyBulk(c,dumpobj);
4850     decrRefCount(dumpobj);
4851     return;
4852 }
4853 
4854 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4855 void restoreCommand(client *c) {
4856     long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4857     rio payload;
4858     int j, type, replace = 0, absttl = 0;
4859     robj *obj;
4860 
4861     /* Parse additional options */
4862     for (j = 4; j < c->argc; j++) {
4863         int additional = c->argc-j-1;
4864         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4865             replace = 1;
4866         } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
4867             absttl = 1;
4868         } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
4869                    lfu_freq == -1)
4870         {
4871             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
4872                     != C_OK) return;
4873             if (lru_idle < 0) {
4874                 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
4875                 return;
4876             }
4877             lru_clock = LRU_CLOCK();
4878             j++; /* Consume additional arg. */
4879         } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
4880                    lru_idle == -1)
4881         {
4882             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
4883                     != C_OK) return;
4884             if (lfu_freq < 0 || lfu_freq > 255) {
4885                 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
4886                 return;
4887             }
4888             j++; /* Consume additional arg. */
4889         } else {
4890             addReply(c,shared.syntaxerr);
4891             return;
4892         }
4893     }
4894 
4895     /* Make sure this key does not already exist here... */
4896     if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
4897         addReply(c,shared.busykeyerr);
4898         return;
4899     }
4900 
4901     /* Check if the TTL value makes sense */
4902     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4903         return;
4904     } else if (ttl < 0) {
4905         addReplyError(c,"Invalid TTL value, must be >= 0");
4906         return;
4907     }
4908 
4909     /* Verify RDB version and data checksum. */
4910     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4911     {
4912         addReplyError(c,"DUMP payload version or checksum are wrong");
4913         return;
4914     }
4915 
4916     rioInitWithBuffer(&payload,c->argv[3]->ptr);
4917     if (((type = rdbLoadObjectType(&payload)) == -1) ||
4918         ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
4919     {
4920         addReplyError(c,"Bad data format");
4921         return;
4922     }
4923 
4924     /* Remove the old key if needed. */
4925     if (replace) dbDelete(c->db,c->argv[1]);
4926 
4927     /* Create the key and set the TTL if any */
4928     dbAdd(c->db,c->argv[1],obj);
4929     if (ttl) {
4930         if (!absttl) ttl+=mstime();
4931         setExpire(c,c->db,c->argv[1],ttl);
4932     }
4933     objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
4934     signalModifiedKey(c->db,c->argv[1]);
4935     addReply(c,shared.ok);
4936     server.dirty++;
4937 }
4938 
4939 /* MIGRATE socket cache implementation.
4940  *
4941  * We take a map between host:ip and a TCP socket that we used to connect
4942  * to this instance in recent time.
4943  * This sockets are closed when the max number we cache is reached, and also
4944  * in serverCron() when they are around for more than a few seconds. */
4945 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
4946 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
4947 
4948 typedef struct migrateCachedSocket {
4949     int fd;
4950     long last_dbid;
4951     time_t last_use_time;
4952 } migrateCachedSocket;
4953 
4954 /* Return a migrateCachedSocket containing a TCP socket connected with the
4955  * target instance, possibly returning a cached one.
4956  *
4957  * This function is responsible of sending errors to the client if a
4958  * connection can't be established. In this case -1 is returned.
4959  * Otherwise on success the socket is returned, and the caller should not
4960  * attempt to free it after usage.
4961  *
4962  * If the caller detects an error while using the socket, migrateCloseSocket()
4963  * should be called so that the connection will be created from scratch
4964  * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)4965 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
4966     int fd;
4967     sds name = sdsempty();
4968     migrateCachedSocket *cs;
4969 
4970     /* Check if we have an already cached socket for this ip:port pair. */
4971     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4972     name = sdscatlen(name,":",1);
4973     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4974     cs = dictFetchValue(server.migrate_cached_sockets,name);
4975     if (cs) {
4976         sdsfree(name);
4977         cs->last_use_time = server.unixtime;
4978         return cs;
4979     }
4980 
4981     /* No cached socket, create one. */
4982     if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
4983         /* Too many items, drop one at random. */
4984         dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4985         cs = dictGetVal(de);
4986         close(cs->fd);
4987         zfree(cs);
4988         dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4989     }
4990 
4991     /* Create the socket */
4992     fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
4993                                 atoi(c->argv[2]->ptr));
4994     if (fd == -1) {
4995         sdsfree(name);
4996         addReplyErrorFormat(c,"Can't connect to target node: %s",
4997             server.neterr);
4998         return NULL;
4999     }
5000     anetEnableTcpNoDelay(server.neterr,fd);
5001 
5002     /* Check if it connects within the specified timeout. */
5003     if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
5004         sdsfree(name);
5005         addReplySds(c,
5006             sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5007         close(fd);
5008         return NULL;
5009     }
5010 
5011     /* Add to the cache and return it to the caller. */
5012     cs = zmalloc(sizeof(*cs));
5013     cs->fd = fd;
5014     cs->last_dbid = -1;
5015     cs->last_use_time = server.unixtime;
5016     dictAdd(server.migrate_cached_sockets,name,cs);
5017     return cs;
5018 }
5019 
5020 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5021 void migrateCloseSocket(robj *host, robj *port) {
5022     sds name = sdsempty();
5023     migrateCachedSocket *cs;
5024 
5025     name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5026     name = sdscatlen(name,":",1);
5027     name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5028     cs = dictFetchValue(server.migrate_cached_sockets,name);
5029     if (!cs) {
5030         sdsfree(name);
5031         return;
5032     }
5033 
5034     close(cs->fd);
5035     zfree(cs);
5036     dictDelete(server.migrate_cached_sockets,name);
5037     sdsfree(name);
5038 }
5039 
migrateCloseTimedoutSockets(void)5040 void migrateCloseTimedoutSockets(void) {
5041     dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5042     dictEntry *de;
5043 
5044     while((de = dictNext(di)) != NULL) {
5045         migrateCachedSocket *cs = dictGetVal(de);
5046 
5047         if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5048             close(cs->fd);
5049             zfree(cs);
5050             dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5051         }
5052     }
5053     dictReleaseIterator(di);
5054 }
5055 
5056 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
5057  *
5058  * On in the multiple keys form:
5059  *
5060  * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1
5061  * key2 ... keyN */
migrateCommand(client * c)5062 void migrateCommand(client *c) {
5063     migrateCachedSocket *cs;
5064     int copy = 0, replace = 0, j;
5065     char *password = NULL;
5066     long timeout;
5067     long dbid;
5068     robj **ov = NULL; /* Objects to migrate. */
5069     robj **kv = NULL; /* Key names. */
5070     robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5071     rio cmd, payload;
5072     int may_retry = 1;
5073     int write_error = 0;
5074     int argv_rewritten = 0;
5075 
5076     /* To support the KEYS option we need the following additional state. */
5077     int first_key = 3; /* Argument index of the first key. */
5078     int num_keys = 1;  /* By default only migrate the 'key' argument. */
5079 
5080     /* Parse additional options */
5081     for (j = 6; j < c->argc; j++) {
5082         int moreargs = j < c->argc-1;
5083         if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5084             copy = 1;
5085         } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5086             replace = 1;
5087         } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5088             if (!moreargs) {
5089                 addReply(c,shared.syntaxerr);
5090                 return;
5091             }
5092             j++;
5093             password = c->argv[j]->ptr;
5094         } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5095             if (sdslen(c->argv[3]->ptr) != 0) {
5096                 addReplyError(c,
5097                     "When using MIGRATE KEYS option, the key argument"
5098                     " must be set to the empty string");
5099                 return;
5100             }
5101             first_key = j+1;
5102             num_keys = c->argc - j - 1;
5103             break; /* All the remaining args are keys. */
5104         } else {
5105             addReply(c,shared.syntaxerr);
5106             return;
5107         }
5108     }
5109 
5110     /* Sanity check */
5111     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5112         getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5113     {
5114         return;
5115     }
5116     if (timeout <= 0) timeout = 1000;
5117 
5118     /* Check if the keys are here. If at least one key is to migrate, do it
5119      * otherwise if all the keys are missing reply with "NOKEY" to signal
5120      * the caller there was nothing to migrate. We don't return an error in
5121      * this case, since often this is due to a normal condition like the key
5122      * expiring in the meantime. */
5123     ov = zrealloc(ov,sizeof(robj*)*num_keys);
5124     kv = zrealloc(kv,sizeof(robj*)*num_keys);
5125     int oi = 0;
5126 
5127     for (j = 0; j < num_keys; j++) {
5128         if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5129             kv[oi] = c->argv[first_key+j];
5130             oi++;
5131         }
5132     }
5133     num_keys = oi;
5134     if (num_keys == 0) {
5135         zfree(ov); zfree(kv);
5136         addReplySds(c,sdsnew("+NOKEY\r\n"));
5137         return;
5138     }
5139 
5140 try_again:
5141     write_error = 0;
5142 
5143     /* Connect */
5144     cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5145     if (cs == NULL) {
5146         zfree(ov); zfree(kv);
5147         return; /* error sent to the client by migrateGetSocket() */
5148     }
5149 
5150     rioInitWithBuffer(&cmd,sdsempty());
5151 
5152     /* Authentication */
5153     if (password) {
5154         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5155         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5156         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5157             sdslen(password)));
5158     }
5159 
5160     /* Send the SELECT command if the current DB is not already selected. */
5161     int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5162     if (select) {
5163         serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5164         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5165         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5166     }
5167 
5168     int non_expired = 0; /* Number of keys that we'll find non expired.
5169                             Note that serializing large keys may take some time
5170                             so certain keys that were found non expired by the
5171                             lookupKey() function, may be expired later. */
5172 
5173     /* Create RESTORE payload and generate the protocol to call the command. */
5174     for (j = 0; j < num_keys; j++) {
5175         long long ttl = 0;
5176         long long expireat = getExpire(c->db,kv[j]);
5177 
5178         if (expireat != -1) {
5179             ttl = expireat-mstime();
5180             if (ttl < 0) {
5181                 continue;
5182             }
5183             if (ttl < 1) ttl = 1;
5184         }
5185 
5186         /* Relocate valid (non expired) keys into the array in successive
5187          * positions to remove holes created by the keys that were present
5188          * in the first lookup but are now expired after the second lookup. */
5189         kv[non_expired++] = kv[j];
5190 
5191         serverAssertWithInfo(c,NULL,
5192             rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5193 
5194         if (server.cluster_enabled)
5195             serverAssertWithInfo(c,NULL,
5196                 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5197         else
5198             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5199         serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5200         serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5201                 sdslen(kv[j]->ptr)));
5202         serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5203 
5204         /* Emit the payload argument, that is the serialized object using
5205          * the DUMP format. */
5206         createDumpPayload(&payload,ov[j],kv[j]);
5207         serverAssertWithInfo(c,NULL,
5208             rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5209                                sdslen(payload.io.buffer.ptr)));
5210         sdsfree(payload.io.buffer.ptr);
5211 
5212         /* Add the REPLACE option to the RESTORE command if it was specified
5213          * as a MIGRATE option. */
5214         if (replace)
5215             serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5216     }
5217 
5218     /* Fix the actual number of keys we are migrating. */
5219     num_keys = non_expired;
5220 
5221     /* Transfer the query to the other node in 64K chunks. */
5222     errno = 0;
5223     {
5224         sds buf = cmd.io.buffer.ptr;
5225         size_t pos = 0, towrite;
5226         int nwritten = 0;
5227 
5228         while ((towrite = sdslen(buf)-pos) > 0) {
5229             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5230             nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
5231             if (nwritten != (signed)towrite) {
5232                 write_error = 1;
5233                 goto socket_err;
5234             }
5235             pos += nwritten;
5236         }
5237     }
5238 
5239     char buf0[1024]; /* Auth reply. */
5240     char buf1[1024]; /* Select reply. */
5241     char buf2[1024]; /* Restore reply. */
5242 
5243     /* Read the AUTH reply if needed. */
5244     if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
5245         goto socket_err;
5246 
5247     /* Read the SELECT reply if needed. */
5248     if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
5249         goto socket_err;
5250 
5251     /* Read the RESTORE replies. */
5252     int error_from_target = 0;
5253     int socket_error = 0;
5254     int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5255 
5256     /* Allocate the new argument vector that will replace the current command,
5257      * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5258      * We allocate num_keys+1 because the additional argument is for "DEL"
5259      * command name itself. */
5260     if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5261 
5262     for (j = 0; j < num_keys; j++) {
5263         if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
5264             socket_error = 1;
5265             break;
5266         }
5267         if ((password && buf0[0] == '-') ||
5268             (select && buf1[0] == '-') ||
5269             buf2[0] == '-')
5270         {
5271             /* On error assume that last_dbid is no longer valid. */
5272             if (!error_from_target) {
5273                 cs->last_dbid = -1;
5274                 char *errbuf;
5275                 if (password && buf0[0] == '-') errbuf = buf0;
5276                 else if (select && buf1[0] == '-') errbuf = buf1;
5277                 else errbuf = buf2;
5278 
5279                 error_from_target = 1;
5280                 addReplyErrorFormat(c,"Target instance replied with error: %s",
5281                     errbuf+1);
5282             }
5283         } else {
5284             if (!copy) {
5285                 /* No COPY option: remove the local key, signal the change. */
5286                 dbDelete(c->db,kv[j]);
5287                 signalModifiedKey(c->db,kv[j]);
5288                 server.dirty++;
5289 
5290                 /* Populate the argument vector to replace the old one. */
5291                 newargv[del_idx++] = kv[j];
5292                 incrRefCount(kv[j]);
5293             }
5294         }
5295     }
5296 
5297     /* On socket error, if we want to retry, do it now before rewriting the
5298      * command vector. We only retry if we are sure nothing was processed
5299      * and we failed to read the first reply (j == 0 test). */
5300     if (!error_from_target && socket_error && j == 0 && may_retry &&
5301         errno != ETIMEDOUT)
5302     {
5303         goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5304     }
5305 
5306     /* On socket errors, close the migration socket now that we still have
5307      * the original host/port in the ARGV. Later the original command may be
5308      * rewritten to DEL and will be too later. */
5309     if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5310 
5311     if (!copy) {
5312         /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5313          * this only for the keys for which we received an acknowledgement
5314          * from the receiving Redis server, by using the del_idx index. */
5315         if (del_idx > 1) {
5316             newargv[0] = createStringObject("DEL",3);
5317             /* Note that the following call takes ownership of newargv. */
5318             replaceClientCommandVector(c,del_idx,newargv);
5319             argv_rewritten = 1;
5320         } else {
5321             /* No key transfer acknowledged, no need to rewrite as DEL. */
5322             zfree(newargv);
5323         }
5324         newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5325     }
5326 
5327     /* If we are here and a socket error happened, we don't want to retry.
5328      * Just signal the problem to the client, but only do it if we did not
5329      * already queue a different error reported by the destination server. */
5330     if (!error_from_target && socket_error) {
5331         may_retry = 0;
5332         goto socket_err;
5333     }
5334 
5335     if (!error_from_target) {
5336         /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5337          * avoid SELECT the next time if the target DB is the same. Reply +OK.
5338          *
5339          * Note: If we reached this point, even if socket_error is true
5340          * still the SELECT command succeeded (otherwise the code jumps to
5341          * socket_err label. */
5342         cs->last_dbid = dbid;
5343         addReply(c,shared.ok);
5344     } else {
5345         /* On error we already sent it in the for loop above, and set
5346          * the currently selected socket to -1 to force SELECT the next time. */
5347     }
5348 
5349     sdsfree(cmd.io.buffer.ptr);
5350     zfree(ov); zfree(kv); zfree(newargv);
5351     return;
5352 
5353 /* On socket errors we try to close the cached socket and try again.
5354  * It is very common for the cached socket to get closed, if just reopening
5355  * it works it's a shame to notify the error to the caller. */
5356 socket_err:
5357     /* Cleanup we want to perform in both the retry and no retry case.
5358      * Note: Closing the migrate socket will also force SELECT next time. */
5359     sdsfree(cmd.io.buffer.ptr);
5360 
5361     /* If the command was rewritten as DEL and there was a socket error,
5362      * we already closed the socket earlier. While migrateCloseSocket()
5363      * is idempotent, the host/port arguments are now gone, so don't do it
5364      * again. */
5365     if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5366     zfree(newargv);
5367     newargv = NULL; /* This will get reallocated on retry. */
5368 
5369     /* Retry only if it's not a timeout and we never attempted a retry
5370      * (or the code jumping here did not set may_retry to zero). */
5371     if (errno != ETIMEDOUT && may_retry) {
5372         may_retry = 0;
5373         goto try_again;
5374     }
5375 
5376     /* Cleanup we want to do if no retry is attempted. */
5377     zfree(ov); zfree(kv);
5378     addReplySds(c,
5379         sdscatprintf(sdsempty(),
5380             "-IOERR error or timeout %s to target instance\r\n",
5381             write_error ? "writing" : "reading"));
5382     return;
5383 }
5384 
5385 /* -----------------------------------------------------------------------------
5386  * Cluster functions related to serving / redirecting clients
5387  * -------------------------------------------------------------------------- */
5388 
5389 /* The ASKING command is required after a -ASK redirection.
5390  * The client should issue ASKING before to actually send the command to
5391  * the target instance. See the Redis Cluster specification for more
5392  * information. */
askingCommand(client * c)5393 void askingCommand(client *c) {
5394     if (server.cluster_enabled == 0) {
5395         addReplyError(c,"This instance has cluster support disabled");
5396         return;
5397     }
5398     c->flags |= CLIENT_ASKING;
5399     addReply(c,shared.ok);
5400 }
5401 
5402 /* The READONLY command is used by clients to enter the read-only mode.
5403  * In this mode slaves will not redirect clients as long as clients access
5404  * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5405 void readonlyCommand(client *c) {
5406     if (server.cluster_enabled == 0) {
5407         addReplyError(c,"This instance has cluster support disabled");
5408         return;
5409     }
5410     c->flags |= CLIENT_READONLY;
5411     addReply(c,shared.ok);
5412 }
5413 
5414 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5415 void readwriteCommand(client *c) {
5416     c->flags &= ~CLIENT_READONLY;
5417     addReply(c,shared.ok);
5418 }
5419 
5420 /* Return the pointer to the cluster node that is able to serve the command.
5421  * For the function to succeed the command should only target either:
5422  *
5423  * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5424  * 2) Multiple keys in the same hash slot, while the slot is stable (no
5425  *    resharding in progress).
5426  *
5427  * On success the function returns the node that is able to serve the request.
5428  * If the node is not 'myself' a redirection must be perfomed. The kind of
5429  * redirection is specified setting the integer passed by reference
5430  * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5431  * CLUSTER_REDIR_MOVED.
5432  *
5433  * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5434  *
5435  * If the command fails NULL is returned, and the reason of the failure is
5436  * provided via 'error_code', which will be set to:
5437  *
5438  * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5439  * don't belong to the same hash slot.
5440  *
5441  * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5442  * belonging to the same slot, but the slot is not stable (in migration or
5443  * importing state, likely because a resharding is in progress).
5444  *
5445  * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5446  * not bound to any node. In this case the cluster global state should be
5447  * already "down" but it is fragile to rely on the update of the global state,
5448  * so we also handle it here.
5449  *
5450  * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
5451  * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5452 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5453     clusterNode *n = NULL;
5454     robj *firstkey = NULL;
5455     int multiple_keys = 0;
5456     multiState *ms, _ms;
5457     multiCmd mc;
5458     int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5459 
5460     /* Allow any key to be set if a module disabled cluster redirections. */
5461     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5462         return myself;
5463 
5464     /* Set error code optimistically for the base case. */
5465     if (error_code) *error_code = CLUSTER_REDIR_NONE;
5466 
5467     /* Modules can turn off Redis Cluster redirection: this is useful
5468      * when writing a module that implements a completely different
5469      * distributed system. */
5470 
5471     /* We handle all the cases as if they were EXEC commands, so we have
5472      * a common code path for everything */
5473     if (cmd->proc == execCommand) {
5474         /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5475          * error. */
5476         if (!(c->flags & CLIENT_MULTI)) return myself;
5477         ms = &c->mstate;
5478     } else {
5479         /* In order to have a single codepath create a fake Multi State
5480          * structure if the client is not in MULTI/EXEC state, this way
5481          * we have a single codepath below. */
5482         ms = &_ms;
5483         _ms.commands = &mc;
5484         _ms.count = 1;
5485         mc.argv = argv;
5486         mc.argc = argc;
5487         mc.cmd = cmd;
5488     }
5489 
5490     /* Check that all the keys are in the same hash slot, and obtain this
5491      * slot and the node associated. */
5492     for (i = 0; i < ms->count; i++) {
5493         struct redisCommand *mcmd;
5494         robj **margv;
5495         int margc, *keyindex, numkeys, j;
5496 
5497         mcmd = ms->commands[i].cmd;
5498         margc = ms->commands[i].argc;
5499         margv = ms->commands[i].argv;
5500 
5501         keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5502         for (j = 0; j < numkeys; j++) {
5503             robj *thiskey = margv[keyindex[j]];
5504             int thisslot = keyHashSlot((char*)thiskey->ptr,
5505                                        sdslen(thiskey->ptr));
5506 
5507             if (firstkey == NULL) {
5508                 /* This is the first key we see. Check what is the slot
5509                  * and node. */
5510                 firstkey = thiskey;
5511                 slot = thisslot;
5512                 n = server.cluster->slots[slot];
5513 
5514                 /* Error: If a slot is not served, we are in "cluster down"
5515                  * state. However the state is yet to be updated, so this was
5516                  * not trapped earlier in processCommand(). Report the same
5517                  * error to the client. */
5518                 if (n == NULL) {
5519                     getKeysFreeResult(keyindex);
5520                     if (error_code)
5521                         *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5522                     return NULL;
5523                 }
5524 
5525                 /* If we are migrating or importing this slot, we need to check
5526                  * if we have all the keys in the request (the only way we
5527                  * can safely serve the request, otherwise we return a TRYAGAIN
5528                  * error). To do so we set the importing/migrating state and
5529                  * increment a counter for every missing key. */
5530                 if (n == myself &&
5531                     server.cluster->migrating_slots_to[slot] != NULL)
5532                 {
5533                     migrating_slot = 1;
5534                 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5535                     importing_slot = 1;
5536                 }
5537             } else {
5538                 /* If it is not the first key, make sure it is exactly
5539                  * the same key as the first we saw. */
5540                 if (!equalStringObjects(firstkey,thiskey)) {
5541                     if (slot != thisslot) {
5542                         /* Error: multiple keys from different slots. */
5543                         getKeysFreeResult(keyindex);
5544                         if (error_code)
5545                             *error_code = CLUSTER_REDIR_CROSS_SLOT;
5546                         return NULL;
5547                     } else {
5548                         /* Flag this request as one with multiple different
5549                          * keys. */
5550                         multiple_keys = 1;
5551                     }
5552                 }
5553             }
5554 
5555             /* Migarting / Improrting slot? Count keys we don't have. */
5556             if ((migrating_slot || importing_slot) &&
5557                 lookupKeyRead(&server.db[0],thiskey) == NULL)
5558             {
5559                 missing_keys++;
5560             }
5561         }
5562         getKeysFreeResult(keyindex);
5563     }
5564 
5565     /* No key at all in command? then we can serve the request
5566      * without redirections or errors in all the cases. */
5567     if (n == NULL) return myself;
5568 
5569     /* Cluster is globally down but we got keys? We can't serve the request. */
5570     if (server.cluster->state != CLUSTER_OK) {
5571         if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5572         return NULL;
5573     }
5574 
5575     /* Return the hashslot by reference. */
5576     if (hashslot) *hashslot = slot;
5577 
5578     /* MIGRATE always works in the context of the local node if the slot
5579      * is open (migrating or importing state). We need to be able to freely
5580      * move keys among instances in this case. */
5581     if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5582         return myself;
5583 
5584     /* If we don't have all the keys and we are migrating the slot, send
5585      * an ASK redirection. */
5586     if (migrating_slot && missing_keys) {
5587         if (error_code) *error_code = CLUSTER_REDIR_ASK;
5588         return server.cluster->migrating_slots_to[slot];
5589     }
5590 
5591     /* If we are receiving the slot, and the client correctly flagged the
5592      * request as "ASKING", we can serve the request. However if the request
5593      * involves multiple keys and we don't have them all, the only option is
5594      * to send a TRYAGAIN error. */
5595     if (importing_slot &&
5596         (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5597     {
5598         if (multiple_keys && missing_keys) {
5599             if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5600             return NULL;
5601         } else {
5602             return myself;
5603         }
5604     }
5605 
5606     /* Handle the read-only client case reading from a slave: if this
5607      * node is a slave and the request is about an hash slot our master
5608      * is serving, we can reply without redirection. */
5609     if (c->flags & CLIENT_READONLY &&
5610         (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
5611          cmd->proc == evalShaCommand) &&
5612         nodeIsSlave(myself) &&
5613         myself->slaveof == n)
5614     {
5615         return myself;
5616     }
5617 
5618     /* Base case: just return the right node. However if this node is not
5619      * myself, set error_code to MOVED since we need to issue a rediretion. */
5620     if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5621     return n;
5622 }
5623 
5624 /* Send the client the right redirection code, according to error_code
5625  * that should be set to one of CLUSTER_REDIR_* macros.
5626  *
5627  * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5628  * are used, then the node 'n' should not be NULL, but should be the
5629  * node we want to mention in the redirection. Moreover hashslot should
5630  * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5631 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5632     if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5633         addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5634     } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5635         /* The request spawns multiple keys in the same slot,
5636          * but the slot is not "stable" currently as there is
5637          * a migration or import in progress. */
5638         addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5639     } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5640         addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5641     } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5642         addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5643     } else if (error_code == CLUSTER_REDIR_MOVED ||
5644                error_code == CLUSTER_REDIR_ASK)
5645     {
5646         addReplySds(c,sdscatprintf(sdsempty(),
5647             "-%s %d %s:%d\r\n",
5648             (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5649             hashslot,n->ip,n->port));
5650     } else {
5651         serverPanic("getNodeByQuery() unknown error.");
5652     }
5653 }
5654 
5655 /* This function is called by the function processing clients incrementally
5656  * to detect timeouts, in order to handle the following case:
5657  *
5658  * 1) A client blocks with BLPOP or similar blocking operation.
5659  * 2) The master migrates the hash slot elsewhere or turns into a slave.
5660  * 3) The client may remain blocked forever (or up to the max timeout time)
5661  *    waiting for a key change that will never happen.
5662  *
5663  * If the client is found to be blocked into an hash slot this node no
5664  * longer handles, the client is sent a redirection error, and the function
5665  * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5666 int clusterRedirectBlockedClientIfNeeded(client *c) {
5667     if (c->flags & CLIENT_BLOCKED &&
5668         (c->btype == BLOCKED_LIST ||
5669          c->btype == BLOCKED_ZSET ||
5670          c->btype == BLOCKED_STREAM))
5671     {
5672         dictEntry *de;
5673         dictIterator *di;
5674 
5675         /* If the cluster is down, unblock the client with the right error. */
5676         if (server.cluster->state == CLUSTER_FAIL) {
5677             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5678             return 1;
5679         }
5680 
5681         /* All keys must belong to the same slot, so check first key only. */
5682         di = dictGetIterator(c->bpop.keys);
5683         if ((de = dictNext(di)) != NULL) {
5684             robj *key = dictGetKey(de);
5685             int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5686             clusterNode *node = server.cluster->slots[slot];
5687 
5688             /* We send an error and unblock the client if:
5689              * 1) The slot is unassigned, emitting a cluster down error.
5690              * 2) The slot is not handled by this node, nor being imported. */
5691             if (node != myself &&
5692                 server.cluster->importing_slots_from[slot] == NULL)
5693             {
5694                 if (node == NULL) {
5695                     clusterRedirectClient(c,NULL,0,
5696                         CLUSTER_REDIR_DOWN_UNBOUND);
5697                 } else {
5698                     clusterRedirectClient(c,node,slot,
5699                         CLUSTER_REDIR_MOVED);
5700                 }
5701                 dictReleaseIterator(di);
5702                 return 1;
5703             }
5704         }
5705         dictReleaseIterator(di);
5706     }
5707     return 0;
5708 }
5709