1 /* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <sys/file.h>
42 #include <math.h>
43
44 /* A global reference to myself is handy to make code more clear.
45 * Myself always points to server.cluster->myself, that is, the clusterNode
46 * that represents this node. */
47 clusterNode *myself = NULL;
48
49 clusterNode *createClusterNode(char *nodename, int flags);
50 int clusterAddNode(clusterNode *node);
51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
52 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterSendPing(clusterLink *link, int type);
54 void clusterSendFail(char *nodename);
55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
56 void clusterUpdateState(void);
57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
58 sds clusterGenNodesDescription(int filter);
59 clusterNode *clusterLookupNode(const char *name);
60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
61 int clusterAddSlot(clusterNode *n, int slot);
62 int clusterDelSlot(int slot);
63 int clusterDelNodeSlots(clusterNode *node);
64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
65 void clusterSetMaster(clusterNode *n);
66 void clusterHandleSlaveFailover(void);
67 void clusterHandleSlaveMigration(int max_slaves);
68 int bitmapTestBit(unsigned char *bitmap, int pos);
69 void clusterDoBeforeSleep(int flags);
70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
71 void resetManualFailover(void);
72 void clusterCloseAllSlots(void);
73 void clusterSetNodeAsMaster(clusterNode *n);
74 void clusterDelNode(clusterNode *delnode);
75 sds representClusterNodeFlags(sds ci, uint16_t flags);
76 uint64_t clusterGetMaxEpoch(void);
77 int clusterBumpConfigEpochWithoutConsensus(void);
78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
79
80 /* -----------------------------------------------------------------------------
81 * Initialization
82 * -------------------------------------------------------------------------- */
83
84 /* Load the cluster config from 'filename'.
85 *
86 * If the file does not exist or is zero-length (this may happen because
87 * when we lock the nodes.conf file, we create a zero-length one for the
88 * sake of locking if it does not already exist), C_ERR is returned.
89 * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91 FILE *fp = fopen(filename,"r");
92 struct stat sb;
93 char *line;
94 int maxline, j;
95
96 if (fp == NULL) {
97 if (errno == ENOENT) {
98 return C_ERR;
99 } else {
100 serverLog(LL_WARNING,
101 "Loading the cluster node config from %s: %s",
102 filename, strerror(errno));
103 exit(1);
104 }
105 }
106
107 /* Check if the file is zero-length: if so return C_ERR to signal
108 * we have to write the config. */
109 if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110 fclose(fp);
111 return C_ERR;
112 }
113
114 /* Parse the file. Note that single lines of the cluster config file can
115 * be really long as they include all the hash slots of the node.
116 * This means in the worst possible case, half of the Redis slots will be
117 * present in a single line, possibly in importing or migrating state, so
118 * together with the node ID of the sender/receiver.
119 *
120 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121 maxline = 1024+CLUSTER_SLOTS*128;
122 line = zmalloc(maxline);
123 while(fgets(line,maxline,fp) != NULL) {
124 int argc;
125 sds *argv;
126 clusterNode *n, *master;
127 char *p, *s;
128
129 /* Skip blank lines, they can be created either by users manually
130 * editing nodes.conf or by the config writing process if stopped
131 * before the truncate() call. */
132 if (line[0] == '\n' || line[0] == '\0') continue;
133
134 /* Split the line into arguments for processing. */
135 argv = sdssplitargs(line,&argc);
136 if (argv == NULL) goto fmterr;
137
138 /* Handle the special "vars" line. Don't pretend it is the last
139 * line even if it actually is when generated by Redis. */
140 if (strcasecmp(argv[0],"vars") == 0) {
141 for (j = 1; j < argc; j += 2) {
142 if (strcasecmp(argv[j],"currentEpoch") == 0) {
143 server.cluster->currentEpoch =
144 strtoull(argv[j+1],NULL,10);
145 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
146 server.cluster->lastVoteEpoch =
147 strtoull(argv[j+1],NULL,10);
148 } else {
149 serverLog(LL_WARNING,
150 "Skipping unknown cluster config variable '%s'",
151 argv[j]);
152 }
153 }
154 sdsfreesplitres(argv,argc);
155 continue;
156 }
157
158 /* Regular config lines have at least eight fields */
159 if (argc < 8) goto fmterr;
160
161 /* Create this node if it does not exist */
162 n = clusterLookupNode(argv[0]);
163 if (!n) {
164 n = createClusterNode(argv[0],0);
165 clusterAddNode(n);
166 }
167 /* Address and port */
168 if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
169 *p = '\0';
170 memcpy(n->ip,argv[1],strlen(argv[1])+1);
171 char *port = p+1;
172 char *busp = strchr(port,'@');
173 if (busp) {
174 *busp = '\0';
175 busp++;
176 }
177 n->port = atoi(port);
178 /* In older versions of nodes.conf the "@busport" part is missing.
179 * In this case we set it to the default offset of 10000 from the
180 * base port. */
181 n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
182
183 /* Parse flags */
184 p = s = argv[2];
185 while(p) {
186 p = strchr(s,',');
187 if (p) *p = '\0';
188 if (!strcasecmp(s,"myself")) {
189 serverAssert(server.cluster->myself == NULL);
190 myself = server.cluster->myself = n;
191 n->flags |= CLUSTER_NODE_MYSELF;
192 } else if (!strcasecmp(s,"master")) {
193 n->flags |= CLUSTER_NODE_MASTER;
194 } else if (!strcasecmp(s,"slave")) {
195 n->flags |= CLUSTER_NODE_SLAVE;
196 } else if (!strcasecmp(s,"fail?")) {
197 n->flags |= CLUSTER_NODE_PFAIL;
198 } else if (!strcasecmp(s,"fail")) {
199 n->flags |= CLUSTER_NODE_FAIL;
200 n->fail_time = mstime();
201 } else if (!strcasecmp(s,"handshake")) {
202 n->flags |= CLUSTER_NODE_HANDSHAKE;
203 } else if (!strcasecmp(s,"noaddr")) {
204 n->flags |= CLUSTER_NODE_NOADDR;
205 } else if (!strcasecmp(s,"nofailover")) {
206 n->flags |= CLUSTER_NODE_NOFAILOVER;
207 } else if (!strcasecmp(s,"noflags")) {
208 /* nothing to do */
209 } else {
210 serverPanic("Unknown flag in redis cluster config file");
211 }
212 if (p) s = p+1;
213 }
214
215 /* Get master if any. Set the master and populate master's
216 * slave list. */
217 if (argv[3][0] != '-') {
218 master = clusterLookupNode(argv[3]);
219 if (!master) {
220 master = createClusterNode(argv[3],0);
221 clusterAddNode(master);
222 }
223 n->slaveof = master;
224 clusterNodeAddSlave(master,n);
225 }
226
227 /* Set ping sent / pong received timestamps */
228 if (atoi(argv[4])) n->ping_sent = mstime();
229 if (atoi(argv[5])) n->pong_received = mstime();
230
231 /* Set configEpoch for this node. */
232 n->configEpoch = strtoull(argv[6],NULL,10);
233
234 /* Populate hash slots served by this instance. */
235 for (j = 8; j < argc; j++) {
236 int start, stop;
237
238 if (argv[j][0] == '[') {
239 /* Here we handle migrating / importing slots */
240 int slot;
241 char direction;
242 clusterNode *cn;
243
244 p = strchr(argv[j],'-');
245 serverAssert(p != NULL);
246 *p = '\0';
247 direction = p[1]; /* Either '>' or '<' */
248 slot = atoi(argv[j]+1);
249 if (slot < 0 || slot >= CLUSTER_SLOTS) goto fmterr;
250 p += 3;
251 cn = clusterLookupNode(p);
252 if (!cn) {
253 cn = createClusterNode(p,0);
254 clusterAddNode(cn);
255 }
256 if (direction == '>') {
257 server.cluster->migrating_slots_to[slot] = cn;
258 } else {
259 server.cluster->importing_slots_from[slot] = cn;
260 }
261 continue;
262 } else if ((p = strchr(argv[j],'-')) != NULL) {
263 *p = '\0';
264 start = atoi(argv[j]);
265 stop = atoi(p+1);
266 } else {
267 start = stop = atoi(argv[j]);
268 }
269 if (start < 0 || start >= CLUSTER_SLOTS) goto fmterr;
270 if (stop < 0 || stop >= CLUSTER_SLOTS) goto fmterr;
271 while(start <= stop) clusterAddSlot(n, start++);
272 }
273
274 sdsfreesplitres(argv,argc);
275 }
276 /* Config sanity check */
277 if (server.cluster->myself == NULL) goto fmterr;
278
279 zfree(line);
280 fclose(fp);
281
282 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
283
284 /* Something that should never happen: currentEpoch smaller than
285 * the max epoch found in the nodes configuration. However we handle this
286 * as some form of protection against manual editing of critical files. */
287 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
288 server.cluster->currentEpoch = clusterGetMaxEpoch();
289 }
290 return C_OK;
291
292 fmterr:
293 serverLog(LL_WARNING,
294 "Unrecoverable error: corrupted cluster config file.");
295 zfree(line);
296 if (fp) fclose(fp);
297 exit(1);
298 }
299
300 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
301 *
302 * This function writes the node config and returns 0, on error -1
303 * is returned.
304 *
305 * Note: we need to write the file in an atomic way from the point of view
306 * of the POSIX filesystem semantics, so that if the server is stopped
307 * or crashes during the write, we'll end with either the old file or the
308 * new one. Since we have the full payload to write available we can use
309 * a single write to write the whole file. If the pre-existing file was
310 * bigger we pad our payload with newlines that are anyway ignored and truncate
311 * the file afterward. */
clusterSaveConfig(int do_fsync)312 int clusterSaveConfig(int do_fsync) {
313 sds ci;
314 size_t content_size;
315 struct stat sb;
316 int fd;
317
318 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
319
320 /* Get the nodes description and concatenate our "vars" directive to
321 * save currentEpoch and lastVoteEpoch. */
322 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
323 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
324 (unsigned long long) server.cluster->currentEpoch,
325 (unsigned long long) server.cluster->lastVoteEpoch);
326 content_size = sdslen(ci);
327
328 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
329 == -1) goto err;
330
331 /* Pad the new payload if the existing file length is greater. */
332 if (fstat(fd,&sb) != -1) {
333 if (sb.st_size > (off_t)content_size) {
334 ci = sdsgrowzero(ci,sb.st_size);
335 memset(ci+content_size,'\n',sb.st_size-content_size);
336 }
337 }
338 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
339 if (do_fsync) {
340 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
341 fsync(fd);
342 }
343
344 /* Truncate the file if needed to remove the final \n padding that
345 * is just garbage. */
346 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
347 /* ftruncate() failing is not a critical error. */
348 }
349 close(fd);
350 sdsfree(ci);
351 return 0;
352
353 err:
354 if (fd != -1) close(fd);
355 sdsfree(ci);
356 return -1;
357 }
358
clusterSaveConfigOrDie(int do_fsync)359 void clusterSaveConfigOrDie(int do_fsync) {
360 if (clusterSaveConfig(do_fsync) == -1) {
361 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
362 exit(1);
363 }
364 }
365
366 /* Lock the cluster config using flock(), and leaks the file descritor used to
367 * acquire the lock so that the file will be locked forever.
368 *
369 * This works because we always update nodes.conf with a new version
370 * in-place, reopening the file, and writing to it in place (later adjusting
371 * the length with ftruncate()).
372 *
373 * On success C_OK is returned, otherwise an error is logged and
374 * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)375 int clusterLockConfig(char *filename) {
376 /* flock() does not exist on Solaris
377 * and a fcntl-based solution won't help, as we constantly re-open that file,
378 * which will release _all_ locks anyway
379 */
380 #if !defined(__sun)
381 /* To lock it, we need to open the file in a way it is created if
382 * it does not exist, otherwise there is a race condition with other
383 * processes. */
384 int fd = open(filename,O_WRONLY|O_CREAT,0644);
385 if (fd == -1) {
386 serverLog(LL_WARNING,
387 "Can't open %s in order to acquire a lock: %s",
388 filename, strerror(errno));
389 return C_ERR;
390 }
391
392 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
393 if (errno == EWOULDBLOCK) {
394 serverLog(LL_WARNING,
395 "Sorry, the cluster configuration file %s is already used "
396 "by a different Redis Cluster node. Please make sure that "
397 "different nodes use different cluster configuration "
398 "files.", filename);
399 } else {
400 serverLog(LL_WARNING,
401 "Impossible to lock %s: %s", filename, strerror(errno));
402 }
403 close(fd);
404 return C_ERR;
405 }
406 /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
407 * lock to the file as long as the process exists. */
408 #endif /* __sun */
409
410 return C_OK;
411 }
412
413 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
414 * in the "myself" node based on the current configuration of the node,
415 * that may change at runtime via CONFIG SET. This function changes the
416 * set of flags in myself->flags accordingly. */
clusterUpdateMyselfFlags(void)417 void clusterUpdateMyselfFlags(void) {
418 int oldflags = myself->flags;
419 int nofailover = server.cluster_slave_no_failover ?
420 CLUSTER_NODE_NOFAILOVER : 0;
421 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
422 myself->flags |= nofailover;
423 if (myself->flags != oldflags) {
424 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
425 CLUSTER_TODO_UPDATE_STATE);
426 }
427 }
428
clusterInit(void)429 void clusterInit(void) {
430 int saveconf = 0;
431
432 server.cluster = zmalloc(sizeof(clusterState));
433 server.cluster->myself = NULL;
434 server.cluster->currentEpoch = 0;
435 server.cluster->state = CLUSTER_FAIL;
436 server.cluster->size = 1;
437 server.cluster->todo_before_sleep = 0;
438 server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
439 server.cluster->nodes_black_list =
440 dictCreate(&clusterNodesBlackListDictType,NULL);
441 server.cluster->failover_auth_time = 0;
442 server.cluster->failover_auth_count = 0;
443 server.cluster->failover_auth_rank = 0;
444 server.cluster->failover_auth_epoch = 0;
445 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
446 server.cluster->lastVoteEpoch = 0;
447 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
448 server.cluster->stats_bus_messages_sent[i] = 0;
449 server.cluster->stats_bus_messages_received[i] = 0;
450 }
451 server.cluster->stats_pfail_nodes = 0;
452 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
453 clusterCloseAllSlots();
454
455 /* Lock the cluster config file to make sure every node uses
456 * its own nodes.conf. */
457 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
458 exit(1);
459
460 /* Load or create a new nodes configuration. */
461 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
462 /* No configuration found. We will just use the random name provided
463 * by the createClusterNode() function. */
464 myself = server.cluster->myself =
465 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
466 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
467 myself->name);
468 clusterAddNode(myself);
469 saveconf = 1;
470 }
471 if (saveconf) clusterSaveConfigOrDie(1);
472
473 /* We need a listening TCP port for our cluster messaging needs. */
474 server.cfd_count = 0;
475
476 /* Port sanity check II
477 * The other handshake port check is triggered too late to stop
478 * us from trying to use a too-high cluster port number. */
479 if (server.port > (65535-CLUSTER_PORT_INCR)) {
480 serverLog(LL_WARNING, "Redis port number too high. "
481 "Cluster communication port is 10,000 port "
482 "numbers higher than your Redis port. "
483 "Your Redis port number must be "
484 "lower than 55535.");
485 exit(1);
486 }
487
488 if (listenToPort(server.port+CLUSTER_PORT_INCR,
489 server.cfd,&server.cfd_count) == C_ERR)
490 {
491 exit(1);
492 } else {
493 int j;
494
495 for (j = 0; j < server.cfd_count; j++) {
496 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
497 clusterAcceptHandler, NULL) == AE_ERR)
498 serverPanic("Unrecoverable error creating Redis Cluster "
499 "file event.");
500 }
501 }
502
503 /* The slots -> keys map is a radix tree. Initialize it here. */
504 server.cluster->slots_to_keys = raxNew();
505 memset(server.cluster->slots_keys_count,0,
506 sizeof(server.cluster->slots_keys_count));
507
508 /* Set myself->port / cport to my listening ports, we'll just need to
509 * discover the IP address via MEET messages. */
510 myself->port = server.port;
511 myself->cport = server.port+CLUSTER_PORT_INCR;
512 if (server.cluster_announce_port)
513 myself->port = server.cluster_announce_port;
514 if (server.cluster_announce_bus_port)
515 myself->cport = server.cluster_announce_bus_port;
516
517 server.cluster->mf_end = 0;
518 resetManualFailover();
519 clusterUpdateMyselfFlags();
520 }
521
522 /* Reset a node performing a soft or hard reset:
523 *
524 * 1) All other nodes are forget.
525 * 2) All the assigned / open slots are released.
526 * 3) If the node is a slave, it turns into a master.
527 * 5) Only for hard reset: a new Node ID is generated.
528 * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
529 * 7) The new configuration is saved and the cluster state updated.
530 * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)531 void clusterReset(int hard) {
532 dictIterator *di;
533 dictEntry *de;
534 int j;
535
536 /* Turn into master. */
537 if (nodeIsSlave(myself)) {
538 clusterSetNodeAsMaster(myself);
539 replicationUnsetMaster();
540 emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
541 }
542
543 /* Close slots, reset manual failover state. */
544 clusterCloseAllSlots();
545 resetManualFailover();
546
547 /* Unassign all the slots. */
548 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
549
550 /* Forget all the nodes, but myself. */
551 di = dictGetSafeIterator(server.cluster->nodes);
552 while((de = dictNext(di)) != NULL) {
553 clusterNode *node = dictGetVal(de);
554
555 if (node == myself) continue;
556 clusterDelNode(node);
557 }
558 dictReleaseIterator(di);
559
560 /* Hard reset only: set epochs to 0, change node ID. */
561 if (hard) {
562 sds oldname;
563
564 server.cluster->currentEpoch = 0;
565 server.cluster->lastVoteEpoch = 0;
566 myself->configEpoch = 0;
567 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
568
569 /* To change the Node ID we need to remove the old name from the
570 * nodes table, change the ID, and re-add back with new name. */
571 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
572 dictDelete(server.cluster->nodes,oldname);
573 sdsfree(oldname);
574 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
575 clusterAddNode(myself);
576 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
577 }
578
579 /* Make sure to persist the new config and update the state. */
580 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
581 CLUSTER_TODO_UPDATE_STATE|
582 CLUSTER_TODO_FSYNC_CONFIG);
583 }
584
585 /* -----------------------------------------------------------------------------
586 * CLUSTER communication link
587 * -------------------------------------------------------------------------- */
588
createClusterLink(clusterNode * node)589 clusterLink *createClusterLink(clusterNode *node) {
590 clusterLink *link = zmalloc(sizeof(*link));
591 link->ctime = mstime();
592 link->sndbuf = sdsempty();
593 link->rcvbuf = sdsempty();
594 link->node = node;
595 link->fd = -1;
596 return link;
597 }
598
599 /* Free a cluster link, but does not free the associated node of course.
600 * This function will just make sure that the original node associated
601 * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)602 void freeClusterLink(clusterLink *link) {
603 if (link->fd != -1) {
604 aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
605 }
606 sdsfree(link->sndbuf);
607 sdsfree(link->rcvbuf);
608 if (link->node)
609 link->node->link = NULL;
610 close(link->fd);
611 zfree(link);
612 }
613
614 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)615 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
616 int cport, cfd;
617 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
618 char cip[NET_IP_STR_LEN];
619 clusterLink *link;
620 UNUSED(el);
621 UNUSED(mask);
622 UNUSED(privdata);
623
624 /* If the server is starting up, don't accept cluster connections:
625 * UPDATE messages may interact with the database content. */
626 if (server.masterhost == NULL && server.loading) return;
627
628 while(max--) {
629 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
630 if (cfd == ANET_ERR) {
631 if (errno != EWOULDBLOCK)
632 serverLog(LL_VERBOSE,
633 "Error accepting cluster node: %s", server.neterr);
634 return;
635 }
636 anetNonBlock(NULL,cfd);
637 anetEnableTcpNoDelay(NULL,cfd);
638
639 /* Use non-blocking I/O for cluster messages. */
640 serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
641 /* Create a link object we use to handle the connection.
642 * It gets passed to the readable handler when data is available.
643 * Initiallly the link->node pointer is set to NULL as we don't know
644 * which node is, but the right node is references once we know the
645 * node identity. */
646 link = createClusterLink(NULL);
647 link->fd = cfd;
648 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
649 }
650 }
651
652 /* -----------------------------------------------------------------------------
653 * Key space handling
654 * -------------------------------------------------------------------------- */
655
656 /* We have 16384 hash slots. The hash slot of a given key is obtained
657 * as the least significant 14 bits of the crc16 of the key.
658 *
659 * However if the key contains the {...} pattern, only the part between
660 * { and } is hashed. This may be useful in the future to force certain
661 * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)662 unsigned int keyHashSlot(char *key, int keylen) {
663 int s, e; /* start-end indexes of { and } */
664
665 for (s = 0; s < keylen; s++)
666 if (key[s] == '{') break;
667
668 /* No '{' ? Hash the whole key. This is the base case. */
669 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
670
671 /* '{' found? Check if we have the corresponding '}'. */
672 for (e = s+1; e < keylen; e++)
673 if (key[e] == '}') break;
674
675 /* No '}' or nothing between {} ? Hash the whole key. */
676 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
677
678 /* If we are here there is both a { and a } on its right. Hash
679 * what is in the middle between { and }. */
680 return crc16(key+s+1,e-s-1) & 0x3FFF;
681 }
682
683 /* -----------------------------------------------------------------------------
684 * CLUSTER node API
685 * -------------------------------------------------------------------------- */
686
687 /* Create a new cluster node, with the specified flags.
688 * If "nodename" is NULL this is considered a first handshake and a random
689 * node name is assigned to this node (it will be fixed later when we'll
690 * receive the first pong).
691 *
692 * The node is created and returned to the user, but it is not automatically
693 * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)694 clusterNode *createClusterNode(char *nodename, int flags) {
695 clusterNode *node = zmalloc(sizeof(*node));
696
697 if (nodename)
698 memcpy(node->name, nodename, CLUSTER_NAMELEN);
699 else
700 getRandomHexChars(node->name, CLUSTER_NAMELEN);
701 node->ctime = mstime();
702 node->configEpoch = 0;
703 node->flags = flags;
704 memset(node->slots,0,sizeof(node->slots));
705 node->numslots = 0;
706 node->numslaves = 0;
707 node->slaves = NULL;
708 node->slaveof = NULL;
709 node->ping_sent = node->pong_received = 0;
710 node->fail_time = 0;
711 node->link = NULL;
712 memset(node->ip,0,sizeof(node->ip));
713 node->port = 0;
714 node->cport = 0;
715 node->fail_reports = listCreate();
716 node->voted_time = 0;
717 node->orphaned_time = 0;
718 node->repl_offset_time = 0;
719 node->repl_offset = 0;
720 listSetFreeMethod(node->fail_reports,zfree);
721 return node;
722 }
723
724 /* This function is called every time we get a failure report from a node.
725 * The side effect is to populate the fail_reports list (or to update
726 * the timestamp of an existing report).
727 *
728 * 'failing' is the node that is in failure state according to the
729 * 'sender' node.
730 *
731 * The function returns 0 if it just updates a timestamp of an existing
732 * failure report from the same sender. 1 is returned if a new failure
733 * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)734 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
735 list *l = failing->fail_reports;
736 listNode *ln;
737 listIter li;
738 clusterNodeFailReport *fr;
739
740 /* If a failure report from the same sender already exists, just update
741 * the timestamp. */
742 listRewind(l,&li);
743 while ((ln = listNext(&li)) != NULL) {
744 fr = ln->value;
745 if (fr->node == sender) {
746 fr->time = mstime();
747 return 0;
748 }
749 }
750
751 /* Otherwise create a new report. */
752 fr = zmalloc(sizeof(*fr));
753 fr->node = sender;
754 fr->time = mstime();
755 listAddNodeTail(l,fr);
756 return 1;
757 }
758
759 /* Remove failure reports that are too old, where too old means reasonably
760 * older than the global node timeout. Note that anyway for a node to be
761 * flagged as FAIL we need to have a local PFAIL state that is at least
762 * older than the global node timeout, so we don't just trust the number
763 * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)764 void clusterNodeCleanupFailureReports(clusterNode *node) {
765 list *l = node->fail_reports;
766 listNode *ln;
767 listIter li;
768 clusterNodeFailReport *fr;
769 mstime_t maxtime = server.cluster_node_timeout *
770 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
771 mstime_t now = mstime();
772
773 listRewind(l,&li);
774 while ((ln = listNext(&li)) != NULL) {
775 fr = ln->value;
776 if (now - fr->time > maxtime) listDelNode(l,ln);
777 }
778 }
779
780 /* Remove the failing report for 'node' if it was previously considered
781 * failing by 'sender'. This function is called when a node informs us via
782 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
783 *
784 * Note that this function is called relatively often as it gets called even
785 * when there are no nodes failing, and is O(N), however when the cluster is
786 * fine the failure reports list is empty so the function runs in constant
787 * time.
788 *
789 * The function returns 1 if the failure report was found and removed.
790 * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)791 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
792 list *l = node->fail_reports;
793 listNode *ln;
794 listIter li;
795 clusterNodeFailReport *fr;
796
797 /* Search for a failure report from this sender. */
798 listRewind(l,&li);
799 while ((ln = listNext(&li)) != NULL) {
800 fr = ln->value;
801 if (fr->node == sender) break;
802 }
803 if (!ln) return 0; /* No failure report from this sender. */
804
805 /* Remove the failure report. */
806 listDelNode(l,ln);
807 clusterNodeCleanupFailureReports(node);
808 return 1;
809 }
810
811 /* Return the number of external nodes that believe 'node' is failing,
812 * not including this node, that may have a PFAIL or FAIL state for this
813 * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)814 int clusterNodeFailureReportsCount(clusterNode *node) {
815 clusterNodeCleanupFailureReports(node);
816 return listLength(node->fail_reports);
817 }
818
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)819 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
820 int j;
821
822 for (j = 0; j < master->numslaves; j++) {
823 if (master->slaves[j] == slave) {
824 if ((j+1) < master->numslaves) {
825 int remaining_slaves = (master->numslaves - j) - 1;
826 memmove(master->slaves+j,master->slaves+(j+1),
827 (sizeof(*master->slaves) * remaining_slaves));
828 }
829 master->numslaves--;
830 if (master->numslaves == 0)
831 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
832 return C_OK;
833 }
834 }
835 return C_ERR;
836 }
837
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)838 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
839 int j;
840
841 /* If it's already a slave, don't add it again. */
842 for (j = 0; j < master->numslaves; j++)
843 if (master->slaves[j] == slave) return C_ERR;
844 master->slaves = zrealloc(master->slaves,
845 sizeof(clusterNode*)*(master->numslaves+1));
846 master->slaves[master->numslaves] = slave;
847 master->numslaves++;
848 master->flags |= CLUSTER_NODE_MIGRATE_TO;
849 return C_OK;
850 }
851
clusterCountNonFailingSlaves(clusterNode * n)852 int clusterCountNonFailingSlaves(clusterNode *n) {
853 int j, okslaves = 0;
854
855 for (j = 0; j < n->numslaves; j++)
856 if (!nodeFailed(n->slaves[j])) okslaves++;
857 return okslaves;
858 }
859
860 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)861 void freeClusterNode(clusterNode *n) {
862 sds nodename;
863 int j;
864
865 /* If the node has associated slaves, we have to set
866 * all the slaves->slaveof fields to NULL (unknown). */
867 for (j = 0; j < n->numslaves; j++)
868 n->slaves[j]->slaveof = NULL;
869
870 /* Remove this node from the list of slaves of its master. */
871 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
872
873 /* Unlink from the set of nodes. */
874 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
875 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
876 sdsfree(nodename);
877
878 /* Release link and associated data structures. */
879 if (n->link) freeClusterLink(n->link);
880 listRelease(n->fail_reports);
881 zfree(n->slaves);
882 zfree(n);
883 }
884
885 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)886 int clusterAddNode(clusterNode *node) {
887 int retval;
888
889 retval = dictAdd(server.cluster->nodes,
890 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
891 return (retval == DICT_OK) ? C_OK : C_ERR;
892 }
893
894 /* Remove a node from the cluster. The functio performs the high level
895 * cleanup, calling freeClusterNode() for the low level cleanup.
896 * Here we do the following:
897 *
898 * 1) Mark all the slots handled by it as unassigned.
899 * 2) Remove all the failure reports sent by this node and referenced by
900 * other nodes.
901 * 3) Free the node with freeClusterNode() that will in turn remove it
902 * from the hash table and from the list of slaves of its master, if
903 * it is a slave node.
904 */
clusterDelNode(clusterNode * delnode)905 void clusterDelNode(clusterNode *delnode) {
906 int j;
907 dictIterator *di;
908 dictEntry *de;
909
910 /* 1) Mark slots as unassigned. */
911 for (j = 0; j < CLUSTER_SLOTS; j++) {
912 if (server.cluster->importing_slots_from[j] == delnode)
913 server.cluster->importing_slots_from[j] = NULL;
914 if (server.cluster->migrating_slots_to[j] == delnode)
915 server.cluster->migrating_slots_to[j] = NULL;
916 if (server.cluster->slots[j] == delnode)
917 clusterDelSlot(j);
918 }
919
920 /* 2) Remove failure reports. */
921 di = dictGetSafeIterator(server.cluster->nodes);
922 while((de = dictNext(di)) != NULL) {
923 clusterNode *node = dictGetVal(de);
924
925 if (node == delnode) continue;
926 clusterNodeDelFailureReport(node,delnode);
927 }
928 dictReleaseIterator(di);
929
930 /* 3) Free the node, unlinking it from the cluster. */
931 freeClusterNode(delnode);
932 }
933
934 /* Node lookup by name */
clusterLookupNode(const char * name)935 clusterNode *clusterLookupNode(const char *name) {
936 sds s = sdsnewlen(name, CLUSTER_NAMELEN);
937 dictEntry *de;
938
939 de = dictFind(server.cluster->nodes,s);
940 sdsfree(s);
941 if (de == NULL) return NULL;
942 return dictGetVal(de);
943 }
944
945 /* This is only used after the handshake. When we connect a given IP/PORT
946 * as a result of CLUSTER MEET we don't have the node name yet, so we
947 * pick a random one, and will fix it when we receive the PONG request using
948 * this function. */
clusterRenameNode(clusterNode * node,char * newname)949 void clusterRenameNode(clusterNode *node, char *newname) {
950 int retval;
951 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
952
953 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
954 node->name, newname);
955 retval = dictDelete(server.cluster->nodes, s);
956 sdsfree(s);
957 serverAssert(retval == DICT_OK);
958 memcpy(node->name, newname, CLUSTER_NAMELEN);
959 clusterAddNode(node);
960 }
961
962 /* -----------------------------------------------------------------------------
963 * CLUSTER config epoch handling
964 * -------------------------------------------------------------------------- */
965
966 /* Return the greatest configEpoch found in the cluster, or the current
967 * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)968 uint64_t clusterGetMaxEpoch(void) {
969 uint64_t max = 0;
970 dictIterator *di;
971 dictEntry *de;
972
973 di = dictGetSafeIterator(server.cluster->nodes);
974 while((de = dictNext(di)) != NULL) {
975 clusterNode *node = dictGetVal(de);
976 if (node->configEpoch > max) max = node->configEpoch;
977 }
978 dictReleaseIterator(di);
979 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
980 return max;
981 }
982
983 /* If this node epoch is zero or is not already the greatest across the
984 * cluster (from the POV of the local configuration), this function will:
985 *
986 * 1) Generate a new config epoch, incrementing the current epoch.
987 * 2) Assign the new epoch to this node, WITHOUT any consensus.
988 * 3) Persist the configuration on disk before sending packets with the
989 * new configuration.
990 *
991 * If the new config epoch is generated and assigend, C_OK is returned,
992 * otherwise C_ERR is returned (since the node has already the greatest
993 * configuration around) and no operation is performed.
994 *
995 * Important note: this function violates the principle that config epochs
996 * should be generated with consensus and should be unique across the cluster.
997 * However Redis Cluster uses this auto-generated new config epochs in two
998 * cases:
999 *
1000 * 1) When slots are closed after importing. Otherwise resharding would be
1001 * too expensive.
1002 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1003 * failover its master even if there is not master majority able to
1004 * create a new configuration epoch.
1005 *
1006 * Redis Cluster will not explode using this function, even in the case of
1007 * a collision between this node and another node, generating the same
1008 * configuration epoch unilaterally, because the config epoch conflict
1009 * resolution algorithm will eventually move colliding nodes to different
1010 * config epochs. However using this function may violate the "last failover
1011 * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)1012 int clusterBumpConfigEpochWithoutConsensus(void) {
1013 uint64_t maxEpoch = clusterGetMaxEpoch();
1014
1015 if (myself->configEpoch == 0 ||
1016 myself->configEpoch != maxEpoch)
1017 {
1018 server.cluster->currentEpoch++;
1019 myself->configEpoch = server.cluster->currentEpoch;
1020 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1021 CLUSTER_TODO_FSYNC_CONFIG);
1022 serverLog(LL_WARNING,
1023 "New configEpoch set to %llu",
1024 (unsigned long long) myself->configEpoch);
1025 return C_OK;
1026 } else {
1027 return C_ERR;
1028 }
1029 }
1030
1031 /* This function is called when this node is a master, and we receive from
1032 * another master a configuration epoch that is equal to our configuration
1033 * epoch.
1034 *
1035 * BACKGROUND
1036 *
1037 * It is not possible that different slaves get the same config
1038 * epoch during a failover election, because the slaves need to get voted
1039 * by a majority. However when we perform a manual resharding of the cluster
1040 * the node will assign a configuration epoch to itself without to ask
1041 * for agreement. Usually resharding happens when the cluster is working well
1042 * and is supervised by the sysadmin, however it is possible for a failover
1043 * to happen exactly while the node we are resharding a slot to assigns itself
1044 * a new configuration epoch, but before it is able to propagate it.
1045 *
1046 * So technically it is possible in this condition that two nodes end with
1047 * the same configuration epoch.
1048 *
1049 * Another possibility is that there are bugs in the implementation causing
1050 * this to happen.
1051 *
1052 * Moreover when a new cluster is created, all the nodes start with the same
1053 * configEpoch. This collision resolution code allows nodes to automatically
1054 * end with a different configEpoch at startup automatically.
1055 *
1056 * In all the cases, we want a mechanism that resolves this issue automatically
1057 * as a safeguard. The same configuration epoch for masters serving different
1058 * set of slots is not harmful, but it is if the nodes end serving the same
1059 * slots for some reason (manual errors or software bugs) without a proper
1060 * failover procedure.
1061 *
1062 * In general we want a system that eventually always ends with different
1063 * masters having different configuration epochs whatever happened, since
1064 * nothign is worse than a split-brain condition in a distributed system.
1065 *
1066 * BEHAVIOR
1067 *
1068 * When this function gets called, what happens is that if this node
1069 * has the lexicographically smaller Node ID compared to the other node
1070 * with the conflicting epoch (the 'sender' node), it will assign itself
1071 * the greatest configuration epoch currently detected among nodes plus 1.
1072 *
1073 * This means that even if there are multiple nodes colliding, the node
1074 * with the greatest Node ID never moves forward, so eventually all the nodes
1075 * end with a different configuration epoch.
1076 */
clusterHandleConfigEpochCollision(clusterNode * sender)1077 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1078 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1079 if (sender->configEpoch != myself->configEpoch ||
1080 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1081 /* Don't act if the colliding node has a smaller Node ID. */
1082 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1083 /* Get the next ID available at the best of this node knowledge. */
1084 server.cluster->currentEpoch++;
1085 myself->configEpoch = server.cluster->currentEpoch;
1086 clusterSaveConfigOrDie(1);
1087 serverLog(LL_VERBOSE,
1088 "WARNING: configEpoch collision with node %.40s."
1089 " configEpoch set to %llu",
1090 sender->name,
1091 (unsigned long long) myself->configEpoch);
1092 }
1093
1094 /* -----------------------------------------------------------------------------
1095 * CLUSTER nodes blacklist
1096 *
1097 * The nodes blacklist is just a way to ensure that a given node with a given
1098 * Node ID is not readded before some time elapsed (this time is specified
1099 * in seconds in CLUSTER_BLACKLIST_TTL).
1100 *
1101 * This is useful when we want to remove a node from the cluster completely:
1102 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1103 * that even if we receive gossip messages from other nodes that still remember
1104 * about the node we want to remove, we don't re-add it before some time.
1105 *
1106 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1107 * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1108 * in the cluster without dealing with the problem of other nodes re-adding
1109 * back the node to nodes we already sent the FORGET command to.
1110 *
1111 * The data structure used is a hash table with an sds string representing
1112 * the node ID as key, and the time when it is ok to re-add the node as
1113 * value.
1114 * -------------------------------------------------------------------------- */
1115
1116 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1117
1118
1119 /* Before of the addNode() or Exists() operations we always remove expired
1120 * entries from the black list. This is an O(N) operation but it is not a
1121 * problem since add / exists operations are called very infrequently and
1122 * the hash table is supposed to contain very little elements at max.
1123 * However without the cleanup during long uptimes and with some automated
1124 * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1125 void clusterBlacklistCleanup(void) {
1126 dictIterator *di;
1127 dictEntry *de;
1128
1129 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1130 while((de = dictNext(di)) != NULL) {
1131 int64_t expire = dictGetUnsignedIntegerVal(de);
1132
1133 if (expire < server.unixtime)
1134 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1135 }
1136 dictReleaseIterator(di);
1137 }
1138
1139 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1140 void clusterBlacklistAddNode(clusterNode *node) {
1141 dictEntry *de;
1142 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1143
1144 clusterBlacklistCleanup();
1145 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1146 /* If the key was added, duplicate the sds string representation of
1147 * the key for the next lookup. We'll free it at the end. */
1148 id = sdsdup(id);
1149 }
1150 de = dictFind(server.cluster->nodes_black_list,id);
1151 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1152 sdsfree(id);
1153 }
1154
1155 /* Return non-zero if the specified node ID exists in the blacklist.
1156 * You don't need to pass an sds string here, any pointer to 40 bytes
1157 * will work. */
clusterBlacklistExists(char * nodeid)1158 int clusterBlacklistExists(char *nodeid) {
1159 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1160 int retval;
1161
1162 clusterBlacklistCleanup();
1163 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1164 sdsfree(id);
1165 return retval;
1166 }
1167
1168 /* -----------------------------------------------------------------------------
1169 * CLUSTER messages exchange - PING/PONG and gossip
1170 * -------------------------------------------------------------------------- */
1171
1172 /* This function checks if a given node should be marked as FAIL.
1173 * It happens if the following conditions are met:
1174 *
1175 * 1) We received enough failure reports from other master nodes via gossip.
1176 * Enough means that the majority of the masters signaled the node is
1177 * down recently.
1178 * 2) We believe this node is in PFAIL state.
1179 *
1180 * If a failure is detected we also inform the whole cluster about this
1181 * event trying to force every other node to set the FAIL flag for the node.
1182 *
1183 * Note that the form of agreement used here is weak, as we collect the majority
1184 * of masters state during some time, and even if we force agreement by
1185 * propagating the FAIL message, because of partitions we may not reach every
1186 * node. However:
1187 *
1188 * 1) Either we reach the majority and eventually the FAIL state will propagate
1189 * to all the cluster.
1190 * 2) Or there is no majority so no slave promotion will be authorized and the
1191 * FAIL flag will be cleared after some time.
1192 */
markNodeAsFailingIfNeeded(clusterNode * node)1193 void markNodeAsFailingIfNeeded(clusterNode *node) {
1194 int failures;
1195 int needed_quorum = (server.cluster->size / 2) + 1;
1196
1197 if (!nodeTimedOut(node)) return; /* We can reach it. */
1198 if (nodeFailed(node)) return; /* Already FAILing. */
1199
1200 failures = clusterNodeFailureReportsCount(node);
1201 /* Also count myself as a voter if I'm a master. */
1202 if (nodeIsMaster(myself)) failures++;
1203 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1204
1205 serverLog(LL_NOTICE,
1206 "Marking node %.40s as failing (quorum reached).", node->name);
1207
1208 /* Mark the node as failing. */
1209 node->flags &= ~CLUSTER_NODE_PFAIL;
1210 node->flags |= CLUSTER_NODE_FAIL;
1211 node->fail_time = mstime();
1212
1213 /* Broadcast the failing node name to everybody, forcing all the other
1214 * reachable nodes to flag the node as FAIL. */
1215 if (nodeIsMaster(myself)) clusterSendFail(node->name);
1216 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1217 }
1218
1219 /* This function is called only if a node is marked as FAIL, but we are able
1220 * to reach it again. It checks if there are the conditions to undo the FAIL
1221 * state. */
clearNodeFailureIfNeeded(clusterNode * node)1222 void clearNodeFailureIfNeeded(clusterNode *node) {
1223 mstime_t now = mstime();
1224
1225 serverAssert(nodeFailed(node));
1226
1227 /* For slaves we always clear the FAIL flag if we can contact the
1228 * node again. */
1229 if (nodeIsSlave(node) || node->numslots == 0) {
1230 serverLog(LL_NOTICE,
1231 "Clear FAIL state for node %.40s: %s is reachable again.",
1232 node->name,
1233 nodeIsSlave(node) ? "replica" : "master without slots");
1234 node->flags &= ~CLUSTER_NODE_FAIL;
1235 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1236 }
1237
1238 /* If it is a master and...
1239 * 1) The FAIL state is old enough.
1240 * 2) It is yet serving slots from our point of view (not failed over).
1241 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1242 if (nodeIsMaster(node) && node->numslots > 0 &&
1243 (now - node->fail_time) >
1244 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1245 {
1246 serverLog(LL_NOTICE,
1247 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1248 node->name);
1249 node->flags &= ~CLUSTER_NODE_FAIL;
1250 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1251 }
1252 }
1253
1254 /* Return true if we already have a node in HANDSHAKE state matching the
1255 * specified ip address and port number. This function is used in order to
1256 * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port,int cport)1257 int clusterHandshakeInProgress(char *ip, int port, int cport) {
1258 dictIterator *di;
1259 dictEntry *de;
1260
1261 di = dictGetSafeIterator(server.cluster->nodes);
1262 while((de = dictNext(di)) != NULL) {
1263 clusterNode *node = dictGetVal(de);
1264
1265 if (!nodeInHandshake(node)) continue;
1266 if (!strcasecmp(node->ip,ip) &&
1267 node->port == port &&
1268 node->cport == cport) break;
1269 }
1270 dictReleaseIterator(di);
1271 return de != NULL;
1272 }
1273
1274 /* Start an handshake with the specified address if there is not one
1275 * already in progress. Returns non-zero if the handshake was actually
1276 * started. On error zero is returned and errno is set to one of the
1277 * following values:
1278 *
1279 * EAGAIN - There is already an handshake in progress for this address.
1280 * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port,int cport)1281 int clusterStartHandshake(char *ip, int port, int cport) {
1282 clusterNode *n;
1283 char norm_ip[NET_IP_STR_LEN];
1284 struct sockaddr_storage sa;
1285
1286 /* IP sanity check */
1287 if (inet_pton(AF_INET,ip,
1288 &(((struct sockaddr_in *)&sa)->sin_addr)))
1289 {
1290 sa.ss_family = AF_INET;
1291 } else if (inet_pton(AF_INET6,ip,
1292 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1293 {
1294 sa.ss_family = AF_INET6;
1295 } else {
1296 errno = EINVAL;
1297 return 0;
1298 }
1299
1300 /* Port sanity check */
1301 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
1302 errno = EINVAL;
1303 return 0;
1304 }
1305
1306 /* Set norm_ip as the normalized string representation of the node
1307 * IP address. */
1308 memset(norm_ip,0,NET_IP_STR_LEN);
1309 if (sa.ss_family == AF_INET)
1310 inet_ntop(AF_INET,
1311 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1312 norm_ip,NET_IP_STR_LEN);
1313 else
1314 inet_ntop(AF_INET6,
1315 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1316 norm_ip,NET_IP_STR_LEN);
1317
1318 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
1319 errno = EAGAIN;
1320 return 0;
1321 }
1322
1323 /* Add the node with a random address (NULL as first argument to
1324 * createClusterNode()). Everything will be fixed during the
1325 * handshake. */
1326 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1327 memcpy(n->ip,norm_ip,sizeof(n->ip));
1328 n->port = port;
1329 n->cport = cport;
1330 clusterAddNode(n);
1331 return 1;
1332 }
1333
1334 /* Process the gossip section of PING or PONG packets.
1335 * Note that this function assumes that the packet is already sanity-checked
1336 * by the caller, not in the content of the gossip section, but in the
1337 * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1338 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1339 uint16_t count = ntohs(hdr->count);
1340 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1341 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1342
1343 while(count--) {
1344 uint16_t flags = ntohs(g->flags);
1345 clusterNode *node;
1346 sds ci;
1347
1348 if (server.verbosity == LL_DEBUG) {
1349 ci = representClusterNodeFlags(sdsempty(), flags);
1350 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
1351 g->nodename,
1352 g->ip,
1353 ntohs(g->port),
1354 ntohs(g->cport),
1355 ci);
1356 sdsfree(ci);
1357 }
1358
1359 /* Update our state accordingly to the gossip sections */
1360 node = clusterLookupNode(g->nodename);
1361 if (node) {
1362 /* We already know this node.
1363 Handle failure reports, only when the sender is a master. */
1364 if (sender && nodeIsMaster(sender) && node != myself) {
1365 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1366 if (clusterNodeAddFailureReport(node,sender)) {
1367 serverLog(LL_VERBOSE,
1368 "Node %.40s reported node %.40s as not reachable.",
1369 sender->name, node->name);
1370 }
1371 markNodeAsFailingIfNeeded(node);
1372 } else {
1373 if (clusterNodeDelFailureReport(node,sender)) {
1374 serverLog(LL_VERBOSE,
1375 "Node %.40s reported node %.40s is back online.",
1376 sender->name, node->name);
1377 }
1378 }
1379 }
1380
1381 /* If from our POV the node is up (no failure flags are set),
1382 * we have no pending ping for the node, nor we have failure
1383 * reports for this node, update the last pong time with the
1384 * one we see from the other nodes. */
1385 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1386 node->ping_sent == 0 &&
1387 clusterNodeFailureReportsCount(node) == 0)
1388 {
1389 mstime_t pongtime = ntohl(g->pong_received);
1390 pongtime *= 1000; /* Convert back to milliseconds. */
1391
1392 /* Replace the pong time with the received one only if
1393 * it's greater than our view but is not in the future
1394 * (with 500 milliseconds tolerance) from the POV of our
1395 * clock. */
1396 if (pongtime <= (server.mstime+500) &&
1397 pongtime > node->pong_received)
1398 {
1399 node->pong_received = pongtime;
1400 }
1401 }
1402
1403 /* If we already know this node, but it is not reachable, and
1404 * we see a different address in the gossip section of a node that
1405 * can talk with this other node, update the address, disconnect
1406 * the old link if any, so that we'll attempt to connect with the
1407 * new address. */
1408 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1409 !(flags & CLUSTER_NODE_NOADDR) &&
1410 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1411 (strcasecmp(node->ip,g->ip) ||
1412 node->port != ntohs(g->port) ||
1413 node->cport != ntohs(g->cport)))
1414 {
1415 if (node->link) freeClusterLink(node->link);
1416 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1417 node->port = ntohs(g->port);
1418 node->cport = ntohs(g->cport);
1419 node->flags &= ~CLUSTER_NODE_NOADDR;
1420 }
1421 } else {
1422 /* If it's not in NOADDR state and we don't have it, we
1423 * start a handshake process against this IP/PORT pairs.
1424 *
1425 * Note that we require that the sender of this gossip message
1426 * is a well known node in our cluster, otherwise we risk
1427 * joining another cluster. */
1428 if (sender &&
1429 !(flags & CLUSTER_NODE_NOADDR) &&
1430 !clusterBlacklistExists(g->nodename))
1431 {
1432 clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
1433 }
1434 }
1435
1436 /* Next node */
1437 g++;
1438 }
1439 }
1440
1441 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
1442 * If 'announced_ip' length is non-zero, it is used instead of extracting
1443 * the IP from the socket peer address. */
nodeIp2String(char * buf,clusterLink * link,char * announced_ip)1444 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
1445 if (announced_ip[0] != '\0') {
1446 memcpy(buf,announced_ip,NET_IP_STR_LEN);
1447 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
1448 } else {
1449 anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1450 }
1451 }
1452
1453 /* Update the node address to the IP address that can be extracted
1454 * from link->fd, or if hdr->myip is non empty, to the address the node
1455 * is announcing us. The port is taken from the packet header as well.
1456 *
1457 * If the address or port changed, disconnect the node link so that we'll
1458 * connect again to the new address.
1459 *
1460 * If the ip/port pair are already correct no operation is performed at
1461 * all.
1462 *
1463 * The function returns 0 if the node address is still the same,
1464 * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,clusterMsg * hdr)1465 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
1466 clusterMsg *hdr)
1467 {
1468 char ip[NET_IP_STR_LEN] = {0};
1469 int port = ntohs(hdr->port);
1470 int cport = ntohs(hdr->cport);
1471
1472 /* We don't proceed if the link is the same as the sender link, as this
1473 * function is designed to see if the node link is consistent with the
1474 * symmetric link that is used to receive PINGs from the node.
1475 *
1476 * As a side effect this function never frees the passed 'link', so
1477 * it is safe to call during packet processing. */
1478 if (link == node->link) return 0;
1479
1480 nodeIp2String(ip,link,hdr->myip);
1481 if (node->port == port && node->cport == cport &&
1482 strcmp(ip,node->ip) == 0) return 0;
1483
1484 /* IP / port is different, update it. */
1485 memcpy(node->ip,ip,sizeof(ip));
1486 node->port = port;
1487 node->cport = cport;
1488 if (node->link) freeClusterLink(node->link);
1489 node->flags &= ~CLUSTER_NODE_NOADDR;
1490 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1491 node->name, node->ip, node->port);
1492
1493 /* Check if this is our master and we have to change the
1494 * replication target as well. */
1495 if (nodeIsSlave(myself) && myself->slaveof == node)
1496 replicationSetMaster(node->ip, node->port);
1497 return 1;
1498 }
1499
1500 /* Reconfigure the specified node 'n' as a master. This function is called when
1501 * a node that we believed to be a slave is now acting as master in order to
1502 * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1503 void clusterSetNodeAsMaster(clusterNode *n) {
1504 if (nodeIsMaster(n)) return;
1505
1506 if (n->slaveof) {
1507 clusterNodeRemoveSlave(n->slaveof,n);
1508 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1509 }
1510 n->flags &= ~CLUSTER_NODE_SLAVE;
1511 n->flags |= CLUSTER_NODE_MASTER;
1512 n->slaveof = NULL;
1513
1514 /* Update config and state. */
1515 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1516 CLUSTER_TODO_UPDATE_STATE);
1517 }
1518
1519 /* This function is called when we receive a master configuration via a
1520 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1521 * node, and the set of slots claimed under this configEpoch.
1522 *
1523 * What we do is to rebind the slots with newer configuration compared to our
1524 * local configuration, and if needed, we turn ourself into a replica of the
1525 * node (see the function comments for more info).
1526 *
1527 * The 'sender' is the node for which we received a configuration update.
1528 * Sometimes it is not actually the "Sender" of the information, like in the
1529 * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1530 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1531 int j;
1532 clusterNode *curmaster, *newmaster = NULL;
1533 /* The dirty slots list is a list of slots for which we lose the ownership
1534 * while having still keys inside. This usually happens after a failover
1535 * or after a manual cluster reconfiguration operated by the admin.
1536 *
1537 * If the update message is not able to demote a master to slave (in this
1538 * case we'll resync with the master updating the whole key space), we
1539 * need to delete all the keys in the slots we lost ownership. */
1540 uint16_t dirty_slots[CLUSTER_SLOTS];
1541 int dirty_slots_count = 0;
1542
1543 /* Here we set curmaster to this node or the node this node
1544 * replicates to if it's a slave. In the for loop we are
1545 * interested to check if slots are taken away from curmaster. */
1546 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1547
1548 if (sender == myself) {
1549 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1550 return;
1551 }
1552
1553 for (j = 0; j < CLUSTER_SLOTS; j++) {
1554 if (bitmapTestBit(slots,j)) {
1555 /* The slot is already bound to the sender of this message. */
1556 if (server.cluster->slots[j] == sender) continue;
1557
1558 /* The slot is in importing state, it should be modified only
1559 * manually via redis-trib (example: a resharding is in progress
1560 * and the migrating side slot was already closed and is advertising
1561 * a new config. We still want the slot to be closed manually). */
1562 if (server.cluster->importing_slots_from[j]) continue;
1563
1564 /* We rebind the slot to the new node claiming it if:
1565 * 1) The slot was unassigned or the new node claims it with a
1566 * greater configEpoch.
1567 * 2) We are not currently importing the slot. */
1568 if (server.cluster->slots[j] == NULL ||
1569 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1570 {
1571 /* Was this slot mine, and still contains keys? Mark it as
1572 * a dirty slot. */
1573 if (server.cluster->slots[j] == myself &&
1574 countKeysInSlot(j) &&
1575 sender != myself)
1576 {
1577 dirty_slots[dirty_slots_count] = j;
1578 dirty_slots_count++;
1579 }
1580
1581 if (server.cluster->slots[j] == curmaster)
1582 newmaster = sender;
1583 clusterDelSlot(j);
1584 clusterAddSlot(sender,j);
1585 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1586 CLUSTER_TODO_UPDATE_STATE|
1587 CLUSTER_TODO_FSYNC_CONFIG);
1588 }
1589 }
1590 }
1591
1592 /* After updating the slots configuration, don't do any actual change
1593 * in the state of the server if a module disabled Redis Cluster
1594 * keys redirections. */
1595 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1596 return;
1597
1598 /* If at least one slot was reassigned from a node to another node
1599 * with a greater configEpoch, it is possible that:
1600 * 1) We are a master left without slots. This means that we were
1601 * failed over and we should turn into a replica of the new
1602 * master.
1603 * 2) We are a slave and our master is left without slots. We need
1604 * to replicate to the new slots owner. */
1605 if (newmaster && curmaster->numslots == 0) {
1606 serverLog(LL_WARNING,
1607 "Configuration change detected. Reconfiguring myself "
1608 "as a replica of %.40s", sender->name);
1609 clusterSetMaster(sender);
1610 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1611 CLUSTER_TODO_UPDATE_STATE|
1612 CLUSTER_TODO_FSYNC_CONFIG);
1613 } else if (dirty_slots_count) {
1614 /* If we are here, we received an update message which removed
1615 * ownership for certain slots we still have keys about, but still
1616 * we are serving some slots, so this master node was not demoted to
1617 * a slave.
1618 *
1619 * In order to maintain a consistent state between keys and slots
1620 * we need to remove all the keys from the slots we lost. */
1621 for (j = 0; j < dirty_slots_count; j++)
1622 delKeysInSlot(dirty_slots[j]);
1623 }
1624 }
1625
1626 /* When this function is called, there is a packet to process starting
1627 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1628 * function should just handle the higher level stuff of processing the
1629 * packet, modifying the cluster state if needed.
1630 *
1631 * The function returns 1 if the link is still valid after the packet
1632 * was processed, otherwise 0 if the link was freed since the packet
1633 * processing lead to some inconsistency error (for instance a PONG
1634 * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1635 int clusterProcessPacket(clusterLink *link) {
1636 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1637 uint32_t totlen = ntohl(hdr->totlen);
1638 uint16_t type = ntohs(hdr->type);
1639
1640 if (type < CLUSTERMSG_TYPE_COUNT)
1641 server.cluster->stats_bus_messages_received[type]++;
1642 serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1643 type, (unsigned long) totlen);
1644
1645 /* Perform sanity checks */
1646 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1647 if (totlen > sdslen(link->rcvbuf)) return 1;
1648
1649 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1650 /* Can't handle messages of different versions. */
1651 return 1;
1652 }
1653
1654 uint16_t flags = ntohs(hdr->flags);
1655 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1656 clusterNode *sender;
1657
1658 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1659 type == CLUSTERMSG_TYPE_MEET)
1660 {
1661 uint16_t count = ntohs(hdr->count);
1662 uint32_t explen; /* expected length of this packet */
1663
1664 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1665 explen += (sizeof(clusterMsgDataGossip)*count);
1666 if (totlen != explen) return 1;
1667 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1668 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1669
1670 explen += sizeof(clusterMsgDataFail);
1671 if (totlen != explen) return 1;
1672 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1673 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1674
1675 explen += sizeof(clusterMsgDataPublish) -
1676 8 +
1677 ntohl(hdr->data.publish.msg.channel_len) +
1678 ntohl(hdr->data.publish.msg.message_len);
1679 if (totlen != explen) return 1;
1680 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1681 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1682 type == CLUSTERMSG_TYPE_MFSTART)
1683 {
1684 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1685
1686 if (totlen != explen) return 1;
1687 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1688 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1689
1690 explen += sizeof(clusterMsgDataUpdate);
1691 if (totlen != explen) return 1;
1692 } else if (type == CLUSTERMSG_TYPE_MODULE) {
1693 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1694
1695 explen += sizeof(clusterMsgDataPublish) -
1696 3 + ntohl(hdr->data.module.msg.len);
1697 if (totlen != explen) return 1;
1698 }
1699
1700 /* Check if the sender is a known node. */
1701 sender = clusterLookupNode(hdr->sender);
1702 if (sender && !nodeInHandshake(sender)) {
1703 /* Update our curretEpoch if we see a newer epoch in the cluster. */
1704 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1705 senderConfigEpoch = ntohu64(hdr->configEpoch);
1706 if (senderCurrentEpoch > server.cluster->currentEpoch)
1707 server.cluster->currentEpoch = senderCurrentEpoch;
1708 /* Update the sender configEpoch if it is publishing a newer one. */
1709 if (senderConfigEpoch > sender->configEpoch) {
1710 sender->configEpoch = senderConfigEpoch;
1711 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1712 CLUSTER_TODO_FSYNC_CONFIG);
1713 }
1714 /* Update the replication offset info for this node. */
1715 sender->repl_offset = ntohu64(hdr->offset);
1716 sender->repl_offset_time = mstime();
1717 /* If we are a slave performing a manual failover and our master
1718 * sent its offset while already paused, populate the MF state. */
1719 if (server.cluster->mf_end &&
1720 nodeIsSlave(myself) &&
1721 myself->slaveof == sender &&
1722 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1723 server.cluster->mf_master_offset == 0)
1724 {
1725 server.cluster->mf_master_offset = sender->repl_offset;
1726 serverLog(LL_WARNING,
1727 "Received replication offset for paused "
1728 "master manual failover: %lld",
1729 server.cluster->mf_master_offset);
1730 }
1731 }
1732
1733 /* Initial processing of PING and MEET requests replying with a PONG. */
1734 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1735 serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1736
1737 /* We use incoming MEET messages in order to set the address
1738 * for 'myself', since only other cluster nodes will send us
1739 * MEET messages on handshakes, when the cluster joins, or
1740 * later if we changed address, and those nodes will use our
1741 * official address to connect to us. So by obtaining this address
1742 * from the socket is a simple way to discover / update our own
1743 * address in the cluster without it being hardcoded in the config.
1744 *
1745 * However if we don't have an address at all, we update the address
1746 * even with a normal PING packet. If it's wrong it will be fixed
1747 * by MEET later. */
1748 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
1749 server.cluster_announce_ip == NULL)
1750 {
1751 char ip[NET_IP_STR_LEN];
1752
1753 if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1754 strcmp(ip,myself->ip))
1755 {
1756 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1757 serverLog(LL_WARNING,"IP address for this node updated to %s",
1758 myself->ip);
1759 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1760 }
1761 }
1762
1763 /* Add this node if it is new for us and the msg type is MEET.
1764 * In this stage we don't try to add the node with the right
1765 * flags, slaveof pointer, and so forth, as this details will be
1766 * resolved when we'll receive PONGs from the node. */
1767 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1768 clusterNode *node;
1769
1770 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1771 nodeIp2String(node->ip,link,hdr->myip);
1772 node->port = ntohs(hdr->port);
1773 node->cport = ntohs(hdr->cport);
1774 clusterAddNode(node);
1775 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1776 }
1777
1778 /* If this is a MEET packet from an unknown node, we still process
1779 * the gossip section here since we have to trust the sender because
1780 * of the message type. */
1781 if (!sender && type == CLUSTERMSG_TYPE_MEET)
1782 clusterProcessGossipSection(hdr,link);
1783
1784 /* Anyway reply with a PONG */
1785 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1786 }
1787
1788 /* PING, PONG, MEET: process config information. */
1789 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1790 type == CLUSTERMSG_TYPE_MEET)
1791 {
1792 serverLog(LL_DEBUG,"%s packet received: %p",
1793 type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1794 (void*)link->node);
1795 if (link->node) {
1796 if (nodeInHandshake(link->node)) {
1797 /* If we already have this node, try to change the
1798 * IP/port of the node with the new one. */
1799 if (sender) {
1800 serverLog(LL_VERBOSE,
1801 "Handshake: we already know node %.40s, "
1802 "updating the address if needed.", sender->name);
1803 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
1804 {
1805 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1806 CLUSTER_TODO_UPDATE_STATE);
1807 }
1808 /* Free this node as we already have it. This will
1809 * cause the link to be freed as well. */
1810 clusterDelNode(link->node);
1811 return 0;
1812 }
1813
1814 /* First thing to do is replacing the random name with the
1815 * right node name if this was a handshake stage. */
1816 clusterRenameNode(link->node, hdr->sender);
1817 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1818 link->node->name);
1819 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1820 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1821 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1822 } else if (memcmp(link->node->name,hdr->sender,
1823 CLUSTER_NAMELEN) != 0)
1824 {
1825 /* If the reply has a non matching node ID we
1826 * disconnect this node and set it as not having an associated
1827 * address. */
1828 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1829 link->node->name,
1830 (int)(mstime()-(link->node->ctime)),
1831 link->node->flags);
1832 link->node->flags |= CLUSTER_NODE_NOADDR;
1833 link->node->ip[0] = '\0';
1834 link->node->port = 0;
1835 link->node->cport = 0;
1836 freeClusterLink(link);
1837 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1838 return 0;
1839 }
1840 }
1841
1842 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
1843 * announced. This is a dynamic flag that we receive from the
1844 * sender, and the latest status must be trusted. We need it to
1845 * be propagated because the slave ranking used to understand the
1846 * delay of each slave in the voting process, needs to know
1847 * what are the instances really competing. */
1848 if (sender) {
1849 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
1850 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
1851 sender->flags |= nofailover;
1852 }
1853
1854 /* Update the node address if it changed. */
1855 if (sender && type == CLUSTERMSG_TYPE_PING &&
1856 !nodeInHandshake(sender) &&
1857 nodeUpdateAddressIfNeeded(sender,link,hdr))
1858 {
1859 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1860 CLUSTER_TODO_UPDATE_STATE);
1861 }
1862
1863 /* Update our info about the node */
1864 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1865 link->node->pong_received = mstime();
1866 link->node->ping_sent = 0;
1867
1868 /* The PFAIL condition can be reversed without external
1869 * help if it is momentary (that is, if it does not
1870 * turn into a FAIL state).
1871 *
1872 * The FAIL condition is also reversible under specific
1873 * conditions detected by clearNodeFailureIfNeeded(). */
1874 if (nodeTimedOut(link->node)) {
1875 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1876 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1877 CLUSTER_TODO_UPDATE_STATE);
1878 } else if (nodeFailed(link->node)) {
1879 clearNodeFailureIfNeeded(link->node);
1880 }
1881 }
1882
1883 /* Check for role switch: slave -> master or master -> slave. */
1884 if (sender) {
1885 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1886 sizeof(hdr->slaveof)))
1887 {
1888 /* Node is a master. */
1889 clusterSetNodeAsMaster(sender);
1890 } else {
1891 /* Node is a slave. */
1892 clusterNode *master = clusterLookupNode(hdr->slaveof);
1893
1894 if (nodeIsMaster(sender)) {
1895 /* Master turned into a slave! Reconfigure the node. */
1896 clusterDelNodeSlots(sender);
1897 sender->flags &= ~(CLUSTER_NODE_MASTER|
1898 CLUSTER_NODE_MIGRATE_TO);
1899 sender->flags |= CLUSTER_NODE_SLAVE;
1900
1901 /* Update config and state. */
1902 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1903 CLUSTER_TODO_UPDATE_STATE);
1904 }
1905
1906 /* Master node changed for this slave? */
1907 if (master && sender->slaveof != master) {
1908 if (sender->slaveof)
1909 clusterNodeRemoveSlave(sender->slaveof,sender);
1910 clusterNodeAddSlave(master,sender);
1911 sender->slaveof = master;
1912
1913 /* Update config. */
1914 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1915 }
1916 }
1917 }
1918
1919 /* Update our info about served slots.
1920 *
1921 * Note: this MUST happen after we update the master/slave state
1922 * so that CLUSTER_NODE_MASTER flag will be set. */
1923
1924 /* Many checks are only needed if the set of served slots this
1925 * instance claims is different compared to the set of slots we have
1926 * for it. Check this ASAP to avoid other computational expansive
1927 * checks later. */
1928 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1929 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1930
1931 if (sender) {
1932 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1933 if (sender_master) {
1934 dirty_slots = memcmp(sender_master->slots,
1935 hdr->myslots,sizeof(hdr->myslots)) != 0;
1936 }
1937 }
1938
1939 /* 1) If the sender of the message is a master, and we detected that
1940 * the set of slots it claims changed, scan the slots to see if we
1941 * need to update our configuration. */
1942 if (sender && nodeIsMaster(sender) && dirty_slots)
1943 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1944
1945 /* 2) We also check for the reverse condition, that is, the sender
1946 * claims to serve slots we know are served by a master with a
1947 * greater configEpoch. If this happens we inform the sender.
1948 *
1949 * This is useful because sometimes after a partition heals, a
1950 * reappearing master may be the last one to claim a given set of
1951 * hash slots, but with a configuration that other instances know to
1952 * be deprecated. Example:
1953 *
1954 * A and B are master and slave for slots 1,2,3.
1955 * A is partitioned away, B gets promoted.
1956 * B is partitioned away, and A returns available.
1957 *
1958 * Usually B would PING A publishing its set of served slots and its
1959 * configEpoch, but because of the partition B can't inform A of the
1960 * new configuration, so other nodes that have an updated table must
1961 * do it. In this way A will stop to act as a master (or can try to
1962 * failover if there are the conditions to win the election). */
1963 if (sender && dirty_slots) {
1964 int j;
1965
1966 for (j = 0; j < CLUSTER_SLOTS; j++) {
1967 if (bitmapTestBit(hdr->myslots,j)) {
1968 if (server.cluster->slots[j] == sender ||
1969 server.cluster->slots[j] == NULL) continue;
1970 if (server.cluster->slots[j]->configEpoch >
1971 senderConfigEpoch)
1972 {
1973 serverLog(LL_VERBOSE,
1974 "Node %.40s has old slots configuration, sending "
1975 "an UPDATE message about %.40s",
1976 sender->name, server.cluster->slots[j]->name);
1977 clusterSendUpdate(sender->link,
1978 server.cluster->slots[j]);
1979
1980 /* TODO: instead of exiting the loop send every other
1981 * UPDATE packet for other nodes that are the new owner
1982 * of sender's slots. */
1983 break;
1984 }
1985 }
1986 }
1987 }
1988
1989 /* If our config epoch collides with the sender's try to fix
1990 * the problem. */
1991 if (sender &&
1992 nodeIsMaster(myself) && nodeIsMaster(sender) &&
1993 senderConfigEpoch == myself->configEpoch)
1994 {
1995 clusterHandleConfigEpochCollision(sender);
1996 }
1997
1998 /* Get info from the gossip section */
1999 if (sender) clusterProcessGossipSection(hdr,link);
2000 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2001 clusterNode *failing;
2002
2003 if (sender) {
2004 failing = clusterLookupNode(hdr->data.fail.about.nodename);
2005 if (failing &&
2006 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
2007 {
2008 serverLog(LL_NOTICE,
2009 "FAIL message received from %.40s about %.40s",
2010 hdr->sender, hdr->data.fail.about.nodename);
2011 failing->flags |= CLUSTER_NODE_FAIL;
2012 failing->fail_time = mstime();
2013 failing->flags &= ~CLUSTER_NODE_PFAIL;
2014 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2015 CLUSTER_TODO_UPDATE_STATE);
2016 }
2017 } else {
2018 serverLog(LL_NOTICE,
2019 "Ignoring FAIL message from unknown node %.40s about %.40s",
2020 hdr->sender, hdr->data.fail.about.nodename);
2021 }
2022 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
2023 robj *channel, *message;
2024 uint32_t channel_len, message_len;
2025
2026 /* Don't bother creating useless objects if there are no
2027 * Pub/Sub subscribers. */
2028 if (dictSize(server.pubsub_channels) ||
2029 listLength(server.pubsub_patterns))
2030 {
2031 channel_len = ntohl(hdr->data.publish.msg.channel_len);
2032 message_len = ntohl(hdr->data.publish.msg.message_len);
2033 channel = createStringObject(
2034 (char*)hdr->data.publish.msg.bulk_data,channel_len);
2035 message = createStringObject(
2036 (char*)hdr->data.publish.msg.bulk_data+channel_len,
2037 message_len);
2038 pubsubPublishMessage(channel,message);
2039 decrRefCount(channel);
2040 decrRefCount(message);
2041 }
2042 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
2043 if (!sender) return 1; /* We don't know that node. */
2044 clusterSendFailoverAuthIfNeeded(sender,hdr);
2045 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
2046 if (!sender) return 1; /* We don't know that node. */
2047 /* We consider this vote only if the sender is a master serving
2048 * a non zero number of slots, and its currentEpoch is greater or
2049 * equal to epoch where this node started the election. */
2050 if (nodeIsMaster(sender) && sender->numslots > 0 &&
2051 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
2052 {
2053 server.cluster->failover_auth_count++;
2054 /* Maybe we reached a quorum here, set a flag to make sure
2055 * we check ASAP. */
2056 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
2057 }
2058 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
2059 /* This message is acceptable only if I'm a master and the sender
2060 * is one of my slaves. */
2061 if (!sender || sender->slaveof != myself) return 1;
2062 /* Manual failover requested from slaves. Initialize the state
2063 * accordingly. */
2064 resetManualFailover();
2065 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
2066 server.cluster->mf_slave = sender;
2067 pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
2068 serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
2069 sender->name);
2070 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2071 clusterNode *n; /* The node the update is about. */
2072 uint64_t reportedConfigEpoch =
2073 ntohu64(hdr->data.update.nodecfg.configEpoch);
2074
2075 if (!sender) return 1; /* We don't know the sender. */
2076 n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
2077 if (!n) return 1; /* We don't know the reported node. */
2078 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
2079
2080 /* If in our current config the node is a slave, set it as a master. */
2081 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
2082
2083 /* Update the node's configEpoch. */
2084 n->configEpoch = reportedConfigEpoch;
2085 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2086 CLUSTER_TODO_FSYNC_CONFIG);
2087
2088 /* Check the bitmap of served slots and update our
2089 * config accordingly. */
2090 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
2091 hdr->data.update.nodecfg.slots);
2092 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2093 if (!sender) return 1; /* Protect the module from unknown nodes. */
2094 /* We need to route this message back to the right module subscribed
2095 * for the right message type. */
2096 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
2097 uint32_t len = ntohl(hdr->data.module.msg.len);
2098 uint8_t type = hdr->data.module.msg.type;
2099 unsigned char *payload = hdr->data.module.msg.bulk_data;
2100 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
2101 } else {
2102 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
2103 }
2104 return 1;
2105 }
2106
2107 /* This function is called when we detect the link with this node is lost.
2108 We set the node as no longer connected. The Cluster Cron will detect
2109 this connection and will try to get it connected again.
2110
2111 Instead if the node is a temporary node used to accept a query, we
2112 completely free the node on error. */
handleLinkIOError(clusterLink * link)2113 void handleLinkIOError(clusterLink *link) {
2114 freeClusterLink(link);
2115 }
2116
2117 /* Send data. This is handled using a trivial send buffer that gets
2118 * consumed by write(). We don't try to optimize this for speed too much
2119 * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)2120 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2121 clusterLink *link = (clusterLink*) privdata;
2122 ssize_t nwritten;
2123 UNUSED(el);
2124 UNUSED(mask);
2125
2126 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2127 if (nwritten <= 0) {
2128 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2129 (nwritten == -1) ? strerror(errno) : "short write");
2130 handleLinkIOError(link);
2131 return;
2132 }
2133 sdsrange(link->sndbuf,nwritten,-1);
2134 if (sdslen(link->sndbuf) == 0)
2135 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2136 }
2137
2138 /* Read data. Try to read the first field of the header first to check the
2139 * full length of the packet. When a whole packet is in memory this function
2140 * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2141 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2142 char buf[sizeof(clusterMsg)];
2143 ssize_t nread;
2144 clusterMsg *hdr;
2145 clusterLink *link = (clusterLink*) privdata;
2146 unsigned int readlen, rcvbuflen;
2147 UNUSED(el);
2148 UNUSED(mask);
2149
2150 while(1) { /* Read as long as there is data to read. */
2151 rcvbuflen = sdslen(link->rcvbuf);
2152 if (rcvbuflen < 8) {
2153 /* First, obtain the first 8 bytes to get the full message
2154 * length. */
2155 readlen = 8 - rcvbuflen;
2156 } else {
2157 /* Finally read the full message. */
2158 hdr = (clusterMsg*) link->rcvbuf;
2159 if (rcvbuflen == 8) {
2160 /* Perform some sanity check on the message signature
2161 * and length. */
2162 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2163 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2164 {
2165 serverLog(LL_WARNING,
2166 "Bad message length or signature received "
2167 "from Cluster bus.");
2168 handleLinkIOError(link);
2169 return;
2170 }
2171 }
2172 readlen = ntohl(hdr->totlen) - rcvbuflen;
2173 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2174 }
2175
2176 nread = read(fd,buf,readlen);
2177 if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2178
2179 if (nread <= 0) {
2180 /* I/O error... */
2181 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2182 (nread == 0) ? "connection closed" : strerror(errno));
2183 handleLinkIOError(link);
2184 return;
2185 } else {
2186 /* Read data and recast the pointer to the new buffer. */
2187 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2188 hdr = (clusterMsg*) link->rcvbuf;
2189 rcvbuflen += nread;
2190 }
2191
2192 /* Total length obtained? Process this packet. */
2193 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2194 if (clusterProcessPacket(link)) {
2195 sdsfree(link->rcvbuf);
2196 link->rcvbuf = sdsempty();
2197 } else {
2198 return; /* Link no longer valid. */
2199 }
2200 }
2201 }
2202 }
2203
2204 /* Put stuff into the send buffer.
2205 *
2206 * It is guaranteed that this function will never have as a side effect
2207 * the link to be invalidated, so it is safe to call this function
2208 * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2209 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2210 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2211 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
2212 clusterWriteHandler,link);
2213
2214 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2215
2216 /* Populate sent messages stats. */
2217 clusterMsg *hdr = (clusterMsg*) msg;
2218 uint16_t type = ntohs(hdr->type);
2219 if (type < CLUSTERMSG_TYPE_COUNT)
2220 server.cluster->stats_bus_messages_sent[type]++;
2221 }
2222
2223 /* Send a message to all the nodes that are part of the cluster having
2224 * a connected link.
2225 *
2226 * It is guaranteed that this function will never have as a side effect
2227 * some node->link to be invalidated, so it is safe to call this function
2228 * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2229 void clusterBroadcastMessage(void *buf, size_t len) {
2230 dictIterator *di;
2231 dictEntry *de;
2232
2233 di = dictGetSafeIterator(server.cluster->nodes);
2234 while((de = dictNext(di)) != NULL) {
2235 clusterNode *node = dictGetVal(de);
2236
2237 if (!node->link) continue;
2238 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2239 continue;
2240 clusterSendMessage(node->link,buf,len);
2241 }
2242 dictReleaseIterator(di);
2243 }
2244
2245 /* Build the message header. hdr must point to a buffer at least
2246 * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2247 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2248 int totlen = 0;
2249 uint64_t offset;
2250 clusterNode *master;
2251
2252 /* If this node is a master, we send its slots bitmap and configEpoch.
2253 * If this node is a slave we send the master's information instead (the
2254 * node is flagged as slave so the receiver knows that it is NOT really
2255 * in charge for this slots. */
2256 master = (nodeIsSlave(myself) && myself->slaveof) ?
2257 myself->slaveof : myself;
2258
2259 memset(hdr,0,sizeof(*hdr));
2260 hdr->ver = htons(CLUSTER_PROTO_VER);
2261 hdr->sig[0] = 'R';
2262 hdr->sig[1] = 'C';
2263 hdr->sig[2] = 'm';
2264 hdr->sig[3] = 'b';
2265 hdr->type = htons(type);
2266 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2267
2268 /* If cluster-announce-ip option is enabled, force the receivers of our
2269 * packets to use the specified address for this node. Otherwise if the
2270 * first byte is zero, they'll do auto discovery. */
2271 memset(hdr->myip,0,NET_IP_STR_LEN);
2272 if (server.cluster_announce_ip) {
2273 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
2274 hdr->myip[NET_IP_STR_LEN-1] = '\0';
2275 }
2276
2277 /* Handle cluster-announce-port as well. */
2278 int announced_port = server.cluster_announce_port ?
2279 server.cluster_announce_port : server.port;
2280 int announced_cport = server.cluster_announce_bus_port ?
2281 server.cluster_announce_bus_port :
2282 (server.port + CLUSTER_PORT_INCR);
2283
2284 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2285 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2286 if (myself->slaveof != NULL)
2287 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2288 hdr->port = htons(announced_port);
2289 hdr->cport = htons(announced_cport);
2290 hdr->flags = htons(myself->flags);
2291 hdr->state = server.cluster->state;
2292
2293 /* Set the currentEpoch and configEpochs. */
2294 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2295 hdr->configEpoch = htonu64(master->configEpoch);
2296
2297 /* Set the replication offset. */
2298 if (nodeIsSlave(myself))
2299 offset = replicationGetSlaveOffset();
2300 else
2301 offset = server.master_repl_offset;
2302 hdr->offset = htonu64(offset);
2303
2304 /* Set the message flags. */
2305 if (nodeIsMaster(myself) && server.cluster->mf_end)
2306 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2307
2308 /* Compute the message length for certain messages. For other messages
2309 * this is up to the caller. */
2310 if (type == CLUSTERMSG_TYPE_FAIL) {
2311 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2312 totlen += sizeof(clusterMsgDataFail);
2313 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2314 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2315 totlen += sizeof(clusterMsgDataUpdate);
2316 }
2317 hdr->totlen = htonl(totlen);
2318 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2319 }
2320
2321 /* Return non zero if the node is already present in the gossip section of the
2322 * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
2323 * zero is returned. Helper for clusterSendPing(). */
clusterNodeIsInGossipSection(clusterMsg * hdr,int count,clusterNode * n)2324 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
2325 int j;
2326 for (j = 0; j < count; j++) {
2327 if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
2328 CLUSTER_NAMELEN) == 0) break;
2329 }
2330 return j != count;
2331 }
2332
2333 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
2334 * to the info of the specified node 'n'. */
clusterSetGossipEntry(clusterMsg * hdr,int i,clusterNode * n)2335 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
2336 clusterMsgDataGossip *gossip;
2337 gossip = &(hdr->data.ping.gossip[i]);
2338 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
2339 gossip->ping_sent = htonl(n->ping_sent/1000);
2340 gossip->pong_received = htonl(n->pong_received/1000);
2341 memcpy(gossip->ip,n->ip,sizeof(n->ip));
2342 gossip->port = htons(n->port);
2343 gossip->cport = htons(n->cport);
2344 gossip->flags = htons(n->flags);
2345 gossip->notused1 = 0;
2346 }
2347
2348 /* Send a PING or PONG packet to the specified node, making sure to add enough
2349 * gossip informations. */
clusterSendPing(clusterLink * link,int type)2350 void clusterSendPing(clusterLink *link, int type) {
2351 unsigned char *buf;
2352 clusterMsg *hdr;
2353 int gossipcount = 0; /* Number of gossip sections added so far. */
2354 int wanted; /* Number of gossip sections we want to append if possible. */
2355 int totlen; /* Total packet length. */
2356 /* freshnodes is the max number of nodes we can hope to append at all:
2357 * nodes available minus two (ourself and the node we are sending the
2358 * message to). However practically there may be less valid nodes since
2359 * nodes in handshake state, disconnected, are not considered. */
2360 int freshnodes = dictSize(server.cluster->nodes)-2;
2361
2362 /* How many gossip sections we want to add? 1/10 of the number of nodes
2363 * and anyway at least 3. Why 1/10?
2364 *
2365 * If we have N masters, with N/10 entries, and we consider that in
2366 * node_timeout we exchange with each other node at least 4 packets
2367 * (we ping in the worst case in node_timeout/2 time, and we also
2368 * receive two pings from the host), we have a total of 8 packets
2369 * in the node_timeout*2 falure reports validity time. So we have
2370 * that, for a single PFAIL node, we can expect to receive the following
2371 * number of failure reports (in the specified window of time):
2372 *
2373 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2374 *
2375 * PROB = probability of being featured in a single gossip entry,
2376 * which is 1 / NUM_OF_NODES.
2377 * ENTRIES = 10.
2378 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2379 *
2380 * If we assume we have just masters (so num of nodes and num of masters
2381 * is the same), with 1/10 we always get over the majority, and specifically
2382 * 80% of the number of nodes, to account for many masters failing at the
2383 * same time.
2384 *
2385 * Since we have non-voting slaves that lower the probability of an entry
2386 * to feature our node, we set the number of entries per packet as
2387 * 10% of the total nodes we have. */
2388 wanted = floor(dictSize(server.cluster->nodes)/10);
2389 if (wanted < 3) wanted = 3;
2390 if (wanted > freshnodes) wanted = freshnodes;
2391
2392 /* Include all the nodes in PFAIL state, so that failure reports are
2393 * faster to propagate to go from PFAIL to FAIL state. */
2394 int pfail_wanted = server.cluster->stats_pfail_nodes;
2395
2396 /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2397 * later according to the number of gossip sections we really were able
2398 * to put inside the packet. */
2399 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2400 totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
2401 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2402 * sizeof(clusterMsg) or more. */
2403 if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2404 buf = zcalloc(totlen);
2405 hdr = (clusterMsg*) buf;
2406
2407 /* Populate the header. */
2408 if (link->node && type == CLUSTERMSG_TYPE_PING)
2409 link->node->ping_sent = mstime();
2410 clusterBuildMessageHdr(hdr,type);
2411
2412 /* Populate the gossip fields */
2413 int maxiterations = wanted*3;
2414 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2415 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2416 clusterNode *this = dictGetVal(de);
2417
2418 /* Don't include this node: the whole packet header is about us
2419 * already, so we just gossip about other nodes. */
2420 if (this == myself) continue;
2421
2422 /* PFAIL nodes will be added later. */
2423 if (this->flags & CLUSTER_NODE_PFAIL) continue;
2424
2425 /* In the gossip section don't include:
2426 * 1) Nodes in HANDSHAKE state.
2427 * 3) Nodes with the NOADDR flag set.
2428 * 4) Disconnected nodes if they don't have configured slots.
2429 */
2430 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2431 (this->link == NULL && this->numslots == 0))
2432 {
2433 freshnodes--; /* Tecnically not correct, but saves CPU. */
2434 continue;
2435 }
2436
2437 /* Do not add a node we already have. */
2438 if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
2439
2440 /* Add it */
2441 clusterSetGossipEntry(hdr,gossipcount,this);
2442 freshnodes--;
2443 gossipcount++;
2444 }
2445
2446 /* If there are PFAIL nodes, add them at the end. */
2447 if (pfail_wanted) {
2448 dictIterator *di;
2449 dictEntry *de;
2450
2451 di = dictGetSafeIterator(server.cluster->nodes);
2452 while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
2453 clusterNode *node = dictGetVal(de);
2454 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
2455 if (node->flags & CLUSTER_NODE_NOADDR) continue;
2456 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
2457 clusterSetGossipEntry(hdr,gossipcount,node);
2458 freshnodes--;
2459 gossipcount++;
2460 /* We take the count of the slots we allocated, since the
2461 * PFAIL stats may not match perfectly with the current number
2462 * of PFAIL nodes. */
2463 pfail_wanted--;
2464 }
2465 dictReleaseIterator(di);
2466 }
2467
2468 /* Ready to send... fix the totlen fiend and queue the message in the
2469 * output buffer. */
2470 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2471 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2472 hdr->count = htons(gossipcount);
2473 hdr->totlen = htonl(totlen);
2474 clusterSendMessage(link,buf,totlen);
2475 zfree(buf);
2476 }
2477
2478 /* Send a PONG packet to every connected node that's not in handshake state
2479 * and for which we have a valid link.
2480 *
2481 * In Redis Cluster pongs are not used just for failure detection, but also
2482 * to carry important configuration information. So broadcasting a pong is
2483 * useful when something changes in the configuration and we want to make
2484 * the cluster aware ASAP (for instance after a slave promotion).
2485 *
2486 * The 'target' argument specifies the receiving instances using the
2487 * defines below:
2488 *
2489 * CLUSTER_BROADCAST_ALL -> All known instances.
2490 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2491 */
2492 #define CLUSTER_BROADCAST_ALL 0
2493 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2494 void clusterBroadcastPong(int target) {
2495 dictIterator *di;
2496 dictEntry *de;
2497
2498 di = dictGetSafeIterator(server.cluster->nodes);
2499 while((de = dictNext(di)) != NULL) {
2500 clusterNode *node = dictGetVal(de);
2501
2502 if (!node->link) continue;
2503 if (node == myself || nodeInHandshake(node)) continue;
2504 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2505 int local_slave =
2506 nodeIsSlave(node) && node->slaveof &&
2507 (node->slaveof == myself || node->slaveof == myself->slaveof);
2508 if (!local_slave) continue;
2509 }
2510 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2511 }
2512 dictReleaseIterator(di);
2513 }
2514
2515 /* Send a PUBLISH message.
2516 *
2517 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2518 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2519 unsigned char buf[sizeof(clusterMsg)], *payload;
2520 clusterMsg *hdr = (clusterMsg*) buf;
2521 uint32_t totlen;
2522 uint32_t channel_len, message_len;
2523
2524 channel = getDecodedObject(channel);
2525 message = getDecodedObject(message);
2526 channel_len = sdslen(channel->ptr);
2527 message_len = sdslen(message->ptr);
2528
2529 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2530 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2531 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2532
2533 hdr->data.publish.msg.channel_len = htonl(channel_len);
2534 hdr->data.publish.msg.message_len = htonl(message_len);
2535 hdr->totlen = htonl(totlen);
2536
2537 /* Try to use the local buffer if possible */
2538 if (totlen < sizeof(buf)) {
2539 payload = buf;
2540 } else {
2541 payload = zmalloc(totlen);
2542 memcpy(payload,hdr,sizeof(*hdr));
2543 hdr = (clusterMsg*) payload;
2544 }
2545 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2546 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2547 message->ptr,sdslen(message->ptr));
2548
2549 if (link)
2550 clusterSendMessage(link,payload,totlen);
2551 else
2552 clusterBroadcastMessage(payload,totlen);
2553
2554 decrRefCount(channel);
2555 decrRefCount(message);
2556 if (payload != buf) zfree(payload);
2557 }
2558
2559 /* Send a FAIL message to all the nodes we are able to contact.
2560 * The FAIL message is sent when we detect that a node is failing
2561 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2562 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2563 * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2564 void clusterSendFail(char *nodename) {
2565 unsigned char buf[sizeof(clusterMsg)];
2566 clusterMsg *hdr = (clusterMsg*) buf;
2567
2568 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2569 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2570 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2571 }
2572
2573 /* Send an UPDATE message to the specified link carrying the specified 'node'
2574 * slots configuration. The node name, slots bitmap, and configEpoch info
2575 * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2576 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2577 unsigned char buf[sizeof(clusterMsg)];
2578 clusterMsg *hdr = (clusterMsg*) buf;
2579
2580 if (link == NULL) return;
2581 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2582 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2583 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2584 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2585 clusterSendMessage(link,buf,ntohl(hdr->totlen));
2586 }
2587
2588 /* Send a MODULE message.
2589 *
2590 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendModule(clusterLink * link,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2591 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
2592 unsigned char *payload, uint32_t len) {
2593 unsigned char buf[sizeof(clusterMsg)], *heapbuf;
2594 clusterMsg *hdr = (clusterMsg*) buf;
2595 uint32_t totlen;
2596
2597 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
2598 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2599 totlen += sizeof(clusterMsgModule) - 3 + len;
2600
2601 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
2602 hdr->data.module.msg.type = type;
2603 hdr->data.module.msg.len = htonl(len);
2604 hdr->totlen = htonl(totlen);
2605
2606 /* Try to use the local buffer if possible */
2607 if (totlen < sizeof(buf)) {
2608 heapbuf = buf;
2609 } else {
2610 heapbuf = zmalloc(totlen);
2611 memcpy(heapbuf,hdr,sizeof(*hdr));
2612 hdr = (clusterMsg*) heapbuf;
2613 }
2614 memcpy(hdr->data.module.msg.bulk_data,payload,len);
2615
2616 if (link)
2617 clusterSendMessage(link,heapbuf,totlen);
2618 else
2619 clusterBroadcastMessage(heapbuf,totlen);
2620
2621 if (heapbuf != buf) zfree(heapbuf);
2622 }
2623
2624 /* This function gets a cluster node ID string as target, the same way the nodes
2625 * addresses are represented in the modules side, resolves the node, and sends
2626 * the message. If the target is NULL the message is broadcasted.
2627 *
2628 * The function returns C_OK if the target is valid, otherwise C_ERR is
2629 * returned. */
clusterSendModuleMessageToTarget(const char * target,uint64_t module_id,uint8_t type,unsigned char * payload,uint32_t len)2630 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
2631 clusterNode *node = NULL;
2632
2633 if (target != NULL) {
2634 node = clusterLookupNode(target);
2635 if (node == NULL || node->link == NULL) return C_ERR;
2636 }
2637
2638 clusterSendModule(target ? node->link : NULL,
2639 module_id, type, payload, len);
2640 return C_OK;
2641 }
2642
2643 /* -----------------------------------------------------------------------------
2644 * CLUSTER Pub/Sub support
2645 *
2646 * For now we do very little, just propagating PUBLISH messages across the whole
2647 * cluster. In the future we'll try to get smarter and avoiding propagating those
2648 * messages to hosts without receives for a given channel.
2649 * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2650 void clusterPropagatePublish(robj *channel, robj *message) {
2651 clusterSendPublish(NULL, channel, message);
2652 }
2653
2654 /* -----------------------------------------------------------------------------
2655 * SLAVE node specific functions
2656 * -------------------------------------------------------------------------- */
2657
2658 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2659 * see if there is the quorum for this slave instance to failover its failing
2660 * master.
2661 *
2662 * Note that we send the failover request to everybody, master and slave nodes,
2663 * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2664 void clusterRequestFailoverAuth(void) {
2665 unsigned char buf[sizeof(clusterMsg)];
2666 clusterMsg *hdr = (clusterMsg*) buf;
2667 uint32_t totlen;
2668
2669 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2670 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2671 * in the header to communicate the nodes receiving the message that
2672 * they should authorized the failover even if the master is working. */
2673 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2674 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2675 hdr->totlen = htonl(totlen);
2676 clusterBroadcastMessage(buf,totlen);
2677 }
2678
2679 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2680 void clusterSendFailoverAuth(clusterNode *node) {
2681 unsigned char buf[sizeof(clusterMsg)];
2682 clusterMsg *hdr = (clusterMsg*) buf;
2683 uint32_t totlen;
2684
2685 if (!node->link) return;
2686 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2687 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2688 hdr->totlen = htonl(totlen);
2689 clusterSendMessage(node->link,buf,totlen);
2690 }
2691
2692 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2693 void clusterSendMFStart(clusterNode *node) {
2694 unsigned char buf[sizeof(clusterMsg)];
2695 clusterMsg *hdr = (clusterMsg*) buf;
2696 uint32_t totlen;
2697
2698 if (!node->link) return;
2699 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2700 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2701 hdr->totlen = htonl(totlen);
2702 clusterSendMessage(node->link,buf,totlen);
2703 }
2704
2705 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2706 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2707 clusterNode *master = node->slaveof;
2708 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2709 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2710 unsigned char *claimed_slots = request->myslots;
2711 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2712 int j;
2713
2714 /* IF we are not a master serving at least 1 slot, we don't have the
2715 * right to vote, as the cluster size in Redis Cluster is the number
2716 * of masters serving at least one slot, and quorum is the cluster
2717 * size + 1 */
2718 if (nodeIsSlave(myself) || myself->numslots == 0) return;
2719
2720 /* Request epoch must be >= our currentEpoch.
2721 * Note that it is impossible for it to actually be greater since
2722 * our currentEpoch was updated as a side effect of receiving this
2723 * request, if the request epoch was greater. */
2724 if (requestCurrentEpoch < server.cluster->currentEpoch) {
2725 serverLog(LL_WARNING,
2726 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2727 node->name,
2728 (unsigned long long) requestCurrentEpoch,
2729 (unsigned long long) server.cluster->currentEpoch);
2730 return;
2731 }
2732
2733 /* I already voted for this epoch? Return ASAP. */
2734 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2735 serverLog(LL_WARNING,
2736 "Failover auth denied to %.40s: already voted for epoch %llu",
2737 node->name,
2738 (unsigned long long) server.cluster->currentEpoch);
2739 return;
2740 }
2741
2742 /* Node must be a slave and its master down.
2743 * The master can be non failing if the request is flagged
2744 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2745 if (nodeIsMaster(node) || master == NULL ||
2746 (!nodeFailed(master) && !force_ack))
2747 {
2748 if (nodeIsMaster(node)) {
2749 serverLog(LL_WARNING,
2750 "Failover auth denied to %.40s: it is a master node",
2751 node->name);
2752 } else if (master == NULL) {
2753 serverLog(LL_WARNING,
2754 "Failover auth denied to %.40s: I don't know its master",
2755 node->name);
2756 } else if (!nodeFailed(master)) {
2757 serverLog(LL_WARNING,
2758 "Failover auth denied to %.40s: its master is up",
2759 node->name);
2760 }
2761 return;
2762 }
2763
2764 /* We did not voted for a slave about this master for two
2765 * times the node timeout. This is not strictly needed for correctness
2766 * of the algorithm but makes the base case more linear. */
2767 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2768 {
2769 serverLog(LL_WARNING,
2770 "Failover auth denied to %.40s: "
2771 "can't vote about this master before %lld milliseconds",
2772 node->name,
2773 (long long) ((server.cluster_node_timeout*2)-
2774 (mstime() - node->slaveof->voted_time)));
2775 return;
2776 }
2777
2778 /* The slave requesting the vote must have a configEpoch for the claimed
2779 * slots that is >= the one of the masters currently serving the same
2780 * slots in the current configuration. */
2781 for (j = 0; j < CLUSTER_SLOTS; j++) {
2782 if (bitmapTestBit(claimed_slots, j) == 0) continue;
2783 if (server.cluster->slots[j] == NULL ||
2784 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2785 {
2786 continue;
2787 }
2788 /* If we reached this point we found a slot that in our current slots
2789 * is served by a master with a greater configEpoch than the one claimed
2790 * by the slave requesting our vote. Refuse to vote for this slave. */
2791 serverLog(LL_WARNING,
2792 "Failover auth denied to %.40s: "
2793 "slot %d epoch (%llu) > reqEpoch (%llu)",
2794 node->name, j,
2795 (unsigned long long) server.cluster->slots[j]->configEpoch,
2796 (unsigned long long) requestConfigEpoch);
2797 return;
2798 }
2799
2800 /* We can vote for this slave. */
2801 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2802 node->slaveof->voted_time = mstime();
2803 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
2804 clusterSendFailoverAuth(node);
2805 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2806 node->name, (unsigned long long) server.cluster->currentEpoch);
2807 }
2808
2809 /* This function returns the "rank" of this instance, a slave, in the context
2810 * of its master-slaves ring. The rank of the slave is given by the number of
2811 * other slaves for the same master that have a better replication offset
2812 * compared to the local one (better means, greater, so they claim more data).
2813 *
2814 * A slave with rank 0 is the one with the greatest (most up to date)
2815 * replication offset, and so forth. Note that because how the rank is computed
2816 * multiple slaves may have the same rank, in case they have the same offset.
2817 *
2818 * The slave rank is used to add a delay to start an election in order to
2819 * get voted and replace a failing master. Slaves with better replication
2820 * offsets are more likely to win. */
clusterGetSlaveRank(void)2821 int clusterGetSlaveRank(void) {
2822 long long myoffset;
2823 int j, rank = 0;
2824 clusterNode *master;
2825
2826 serverAssert(nodeIsSlave(myself));
2827 master = myself->slaveof;
2828 if (master == NULL) return 0; /* Never called by slaves without master. */
2829
2830 myoffset = replicationGetSlaveOffset();
2831 for (j = 0; j < master->numslaves; j++)
2832 if (master->slaves[j] != myself &&
2833 !nodeCantFailover(master->slaves[j]) &&
2834 master->slaves[j]->repl_offset > myoffset) rank++;
2835 return rank;
2836 }
2837
2838 /* This function is called by clusterHandleSlaveFailover() in order to
2839 * let the slave log why it is not able to failover. Sometimes there are
2840 * not the conditions, but since the failover function is called again and
2841 * again, we can't log the same things continuously.
2842 *
2843 * This function works by logging only if a given set of conditions are
2844 * true:
2845 *
2846 * 1) The reason for which the failover can't be initiated changed.
2847 * The reasons also include a NONE reason we reset the state to
2848 * when the slave finds that its master is fine (no FAIL flag).
2849 * 2) Also, the log is emitted again if the master is still down and
2850 * the reason for not failing over is still the same, but more than
2851 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2852 * 3) Finally, the function only logs if the slave is down for more than
2853 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
2854 * failover starts in a reasonable time.
2855 *
2856 * The function is called with the reason why the slave can't failover
2857 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2858 *
2859 * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2860 void clusterLogCantFailover(int reason) {
2861 char *msg;
2862 static time_t lastlog_time = 0;
2863 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2864
2865 /* Don't log if we have the same reason for some time. */
2866 if (reason == server.cluster->cant_failover_reason &&
2867 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2868 return;
2869
2870 server.cluster->cant_failover_reason = reason;
2871
2872 /* We also don't emit any log if the master failed no long ago, the
2873 * goal of this function is to log slaves in a stalled condition for
2874 * a long time. */
2875 if (myself->slaveof &&
2876 nodeFailed(myself->slaveof) &&
2877 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2878
2879 switch(reason) {
2880 case CLUSTER_CANT_FAILOVER_DATA_AGE:
2881 msg = "Disconnected from master for longer than allowed. "
2882 "Please check the 'cluster-replica-validity-factor' configuration "
2883 "option.";
2884 break;
2885 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2886 msg = "Waiting the delay before I can start a new failover.";
2887 break;
2888 case CLUSTER_CANT_FAILOVER_EXPIRED:
2889 msg = "Failover attempt expired.";
2890 break;
2891 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2892 msg = "Waiting for votes, but majority still not reached.";
2893 break;
2894 default:
2895 msg = "Unknown reason code.";
2896 break;
2897 }
2898 lastlog_time = time(NULL);
2899 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2900 }
2901
2902 /* This function implements the final part of automatic and manual failovers,
2903 * where the slave grabs its master's hash slots, and propagates the new
2904 * configuration.
2905 *
2906 * Note that it's up to the caller to be sure that the node got a new
2907 * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2908 void clusterFailoverReplaceYourMaster(void) {
2909 int j;
2910 clusterNode *oldmaster = myself->slaveof;
2911
2912 if (nodeIsMaster(myself) || oldmaster == NULL) return;
2913
2914 /* 1) Turn this node into a master. */
2915 clusterSetNodeAsMaster(myself);
2916 replicationUnsetMaster();
2917
2918 /* 2) Claim all the slots assigned to our master. */
2919 for (j = 0; j < CLUSTER_SLOTS; j++) {
2920 if (clusterNodeGetSlotBit(oldmaster,j)) {
2921 clusterDelSlot(j);
2922 clusterAddSlot(myself,j);
2923 }
2924 }
2925
2926 /* 3) Update state and save config. */
2927 clusterUpdateState();
2928 clusterSaveConfigOrDie(1);
2929
2930 /* 4) Pong all the other nodes so that they can update the state
2931 * accordingly and detect that we switched to master role. */
2932 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2933
2934 /* 5) If there was a manual failover in progress, clear the state. */
2935 resetManualFailover();
2936 }
2937
2938 /* This function is called if we are a slave node and our master serving
2939 * a non-zero amount of hash slots is in FAIL state.
2940 *
2941 * The gaol of this function is:
2942 * 1) To check if we are able to perform a failover, is our data updated?
2943 * 2) Try to get elected by masters.
2944 * 3) Perform the failover informing all the other nodes.
2945 */
clusterHandleSlaveFailover(void)2946 void clusterHandleSlaveFailover(void) {
2947 mstime_t data_age;
2948 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2949 int needed_quorum = (server.cluster->size / 2) + 1;
2950 int manual_failover = server.cluster->mf_end != 0 &&
2951 server.cluster->mf_can_start;
2952 mstime_t auth_timeout, auth_retry_time;
2953
2954 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2955
2956 /* Compute the failover timeout (the max time we have to send votes
2957 * and wait for replies), and the failover retry time (the time to wait
2958 * before trying to get voted again).
2959 *
2960 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
2961 * Retry is two times the Timeout.
2962 */
2963 auth_timeout = server.cluster_node_timeout*2;
2964 if (auth_timeout < 2000) auth_timeout = 2000;
2965 auth_retry_time = auth_timeout*2;
2966
2967 /* Pre conditions to run the function, that must be met both in case
2968 * of an automatic or manual failover:
2969 * 1) We are a slave.
2970 * 2) Our master is flagged as FAIL, or this is a manual failover.
2971 * 3) We don't have the no failover configuration set, and this is
2972 * not a manual failover.
2973 * 4) It is serving slots. */
2974 if (nodeIsMaster(myself) ||
2975 myself->slaveof == NULL ||
2976 (!nodeFailed(myself->slaveof) && !manual_failover) ||
2977 (server.cluster_slave_no_failover && !manual_failover) ||
2978 myself->slaveof->numslots == 0)
2979 {
2980 /* There are no reasons to failover, so we set the reason why we
2981 * are returning without failing over to NONE. */
2982 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2983 return;
2984 }
2985
2986 /* Set data_age to the number of seconds we are disconnected from
2987 * the master. */
2988 if (server.repl_state == REPL_STATE_CONNECTED) {
2989 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2990 * 1000;
2991 } else {
2992 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2993 }
2994
2995 /* Remove the node timeout from the data age as it is fine that we are
2996 * disconnected from our master at least for the time it was down to be
2997 * flagged as FAIL, that's the baseline. */
2998 if (data_age > server.cluster_node_timeout)
2999 data_age -= server.cluster_node_timeout;
3000
3001 /* Check if our data is recent enough according to the slave validity
3002 * factor configured by the user.
3003 *
3004 * Check bypassed for manual failovers. */
3005 if (server.cluster_slave_validity_factor &&
3006 data_age >
3007 (((mstime_t)server.repl_ping_slave_period * 1000) +
3008 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
3009 {
3010 if (!manual_failover) {
3011 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
3012 return;
3013 }
3014 }
3015
3016 /* If the previous failover attempt timedout and the retry time has
3017 * elapsed, we can setup a new one. */
3018 if (auth_age > auth_retry_time) {
3019 server.cluster->failover_auth_time = mstime() +
3020 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
3021 random() % 500; /* Random delay between 0 and 500 milliseconds. */
3022 server.cluster->failover_auth_count = 0;
3023 server.cluster->failover_auth_sent = 0;
3024 server.cluster->failover_auth_rank = clusterGetSlaveRank();
3025 /* We add another delay that is proportional to the slave rank.
3026 * Specifically 1 second * rank. This way slaves that have a probably
3027 * less updated replication offset, are penalized. */
3028 server.cluster->failover_auth_time +=
3029 server.cluster->failover_auth_rank * 1000;
3030 /* However if this is a manual failover, no delay is needed. */
3031 if (server.cluster->mf_end) {
3032 server.cluster->failover_auth_time = mstime();
3033 server.cluster->failover_auth_rank = 0;
3034 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3035 }
3036 serverLog(LL_WARNING,
3037 "Start of election delayed for %lld milliseconds "
3038 "(rank #%d, offset %lld).",
3039 server.cluster->failover_auth_time - mstime(),
3040 server.cluster->failover_auth_rank,
3041 replicationGetSlaveOffset());
3042 /* Now that we have a scheduled election, broadcast our offset
3043 * to all the other slaves so that they'll updated their offsets
3044 * if our offset is better. */
3045 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
3046 return;
3047 }
3048
3049 /* It is possible that we received more updated offsets from other
3050 * slaves for the same master since we computed our election delay.
3051 * Update the delay if our rank changed.
3052 *
3053 * Not performed if this is a manual failover. */
3054 if (server.cluster->failover_auth_sent == 0 &&
3055 server.cluster->mf_end == 0)
3056 {
3057 int newrank = clusterGetSlaveRank();
3058 if (newrank > server.cluster->failover_auth_rank) {
3059 long long added_delay =
3060 (newrank - server.cluster->failover_auth_rank) * 1000;
3061 server.cluster->failover_auth_time += added_delay;
3062 server.cluster->failover_auth_rank = newrank;
3063 serverLog(LL_WARNING,
3064 "Replica rank updated to #%d, added %lld milliseconds of delay.",
3065 newrank, added_delay);
3066 }
3067 }
3068
3069 /* Return ASAP if we can't still start the election. */
3070 if (mstime() < server.cluster->failover_auth_time) {
3071 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
3072 return;
3073 }
3074
3075 /* Return ASAP if the election is too old to be valid. */
3076 if (auth_age > auth_timeout) {
3077 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
3078 return;
3079 }
3080
3081 /* Ask for votes if needed. */
3082 if (server.cluster->failover_auth_sent == 0) {
3083 server.cluster->currentEpoch++;
3084 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
3085 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
3086 (unsigned long long) server.cluster->currentEpoch);
3087 clusterRequestFailoverAuth();
3088 server.cluster->failover_auth_sent = 1;
3089 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3090 CLUSTER_TODO_UPDATE_STATE|
3091 CLUSTER_TODO_FSYNC_CONFIG);
3092 return; /* Wait for replies. */
3093 }
3094
3095 /* Check if we reached the quorum. */
3096 if (server.cluster->failover_auth_count >= needed_quorum) {
3097 /* We have the quorum, we can finally failover the master. */
3098
3099 serverLog(LL_WARNING,
3100 "Failover election won: I'm the new master.");
3101
3102 /* Update my configEpoch to the epoch of the election. */
3103 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
3104 myself->configEpoch = server.cluster->failover_auth_epoch;
3105 serverLog(LL_WARNING,
3106 "configEpoch set to %llu after successful failover",
3107 (unsigned long long) myself->configEpoch);
3108 }
3109
3110 /* Take responsibility for the cluster slots. */
3111 clusterFailoverReplaceYourMaster();
3112 } else {
3113 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
3114 }
3115 }
3116
3117 /* -----------------------------------------------------------------------------
3118 * CLUSTER slave migration
3119 *
3120 * Slave migration is the process that allows a slave of a master that is
3121 * already covered by at least another slave, to "migrate" to a master that
3122 * is orpaned, that is, left with no working slaves.
3123 * ------------------------------------------------------------------------- */
3124
3125 /* This function is responsible to decide if this replica should be migrated
3126 * to a different (orphaned) master. It is called by the clusterCron() function
3127 * only if:
3128 *
3129 * 1) We are a slave node.
3130 * 2) It was detected that there is at least one orphaned master in
3131 * the cluster.
3132 * 3) We are a slave of one of the masters with the greatest number of
3133 * slaves.
3134 *
3135 * This checks are performed by the caller since it requires to iterate
3136 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
3137 * if definitely needed.
3138 *
3139 * The fuction is called with a pre-computed max_slaves, that is the max
3140 * number of working (not in FAIL state) slaves for a single master.
3141 *
3142 * Additional conditions for migration are examined inside the function.
3143 */
clusterHandleSlaveMigration(int max_slaves)3144 void clusterHandleSlaveMigration(int max_slaves) {
3145 int j, okslaves = 0;
3146 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
3147 dictIterator *di;
3148 dictEntry *de;
3149
3150 /* Step 1: Don't migrate if the cluster state is not ok. */
3151 if (server.cluster->state != CLUSTER_OK) return;
3152
3153 /* Step 2: Don't migrate if my master will not be left with at least
3154 * 'migration-barrier' slaves after my migration. */
3155 if (mymaster == NULL) return;
3156 for (j = 0; j < mymaster->numslaves; j++)
3157 if (!nodeFailed(mymaster->slaves[j]) &&
3158 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
3159 if (okslaves <= server.cluster_migration_barrier) return;
3160
3161 /* Step 3: Identify a candidate for migration, and check if among the
3162 * masters with the greatest number of ok slaves, I'm the one with the
3163 * smallest node ID (the "candidate slave").
3164 *
3165 * Note: this means that eventually a replica migration will occur
3166 * since slaves that are reachable again always have their FAIL flag
3167 * cleared, so eventually there must be a candidate. At the same time
3168 * this does not mean that there are no race conditions possible (two
3169 * slaves migrating at the same time), but this is unlikely to
3170 * happen, and harmless when happens. */
3171 candidate = myself;
3172 di = dictGetSafeIterator(server.cluster->nodes);
3173 while((de = dictNext(di)) != NULL) {
3174 clusterNode *node = dictGetVal(de);
3175 int okslaves = 0, is_orphaned = 1;
3176
3177 /* We want to migrate only if this master is working, orphaned, and
3178 * used to have slaves or if failed over a master that had slaves
3179 * (MIGRATE_TO flag). This way we only migrate to instances that were
3180 * supposed to have replicas. */
3181 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
3182 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
3183
3184 /* Check number of working slaves. */
3185 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
3186 if (okslaves > 0) is_orphaned = 0;
3187
3188 if (is_orphaned) {
3189 if (!target && node->numslots > 0) target = node;
3190
3191 /* Track the starting time of the orphaned condition for this
3192 * master. */
3193 if (!node->orphaned_time) node->orphaned_time = mstime();
3194 } else {
3195 node->orphaned_time = 0;
3196 }
3197
3198 /* Check if I'm the slave candidate for the migration: attached
3199 * to a master with the maximum number of slaves and with the smallest
3200 * node ID. */
3201 if (okslaves == max_slaves) {
3202 for (j = 0; j < node->numslaves; j++) {
3203 if (memcmp(node->slaves[j]->name,
3204 candidate->name,
3205 CLUSTER_NAMELEN) < 0)
3206 {
3207 candidate = node->slaves[j];
3208 }
3209 }
3210 }
3211 }
3212 dictReleaseIterator(di);
3213
3214 /* Step 4: perform the migration if there is a target, and if I'm the
3215 * candidate, but only if the master is continuously orphaned for a
3216 * couple of seconds, so that during failovers, we give some time to
3217 * the natural slaves of this instance to advertise their switch from
3218 * the old master to the new one. */
3219 if (target && candidate == myself &&
3220 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
3221 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3222 {
3223 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
3224 target->name);
3225 clusterSetMaster(target);
3226 }
3227 }
3228
3229 /* -----------------------------------------------------------------------------
3230 * CLUSTER manual failover
3231 *
3232 * This are the important steps performed by slaves during a manual failover:
3233 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
3234 * setting mf_end to the millisecond unix time at which we'll abort the
3235 * attempt.
3236 * 2) Slave sends a MFSTART message to the master requesting to pause clients
3237 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
3238 * When master is paused for manual failover, it also starts to flag
3239 * packets with CLUSTERMSG_FLAG0_PAUSED.
3240 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
3241 * 4) If slave received the offset from the master, and its offset matches,
3242 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
3243 * the failover as usually, with the difference that the vote request
3244 * will be modified to force masters to vote for a slave that has a
3245 * working master.
3246 *
3247 * From the point of view of the master things are simpler: when a
3248 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3249 * the sender in mf_slave. During the time limit for the manual failover
3250 * the master will just send PINGs more often to this slave, flagged with
3251 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3252 * a packet from the master with this flag set.
3253 *
3254 * The gaol of the manual failover is to perform a fast failover without
3255 * data loss due to the asynchronous master-slave replication.
3256 * -------------------------------------------------------------------------- */
3257
3258 /* Reset the manual failover state. This works for both masters and slavesa
3259 * as all the state about manual failover is cleared.
3260 *
3261 * The function can be used both to initialize the manual failover state at
3262 * startup or to abort a manual failover in progress. */
resetManualFailover(void)3263 void resetManualFailover(void) {
3264 if (server.cluster->mf_end && clientsArePaused()) {
3265 server.clients_pause_end_time = 0;
3266 clientsArePaused(); /* Just use the side effect of the function. */
3267 }
3268 server.cluster->mf_end = 0; /* No manual failover in progress. */
3269 server.cluster->mf_can_start = 0;
3270 server.cluster->mf_slave = NULL;
3271 server.cluster->mf_master_offset = 0;
3272 }
3273
3274 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3275 void manualFailoverCheckTimeout(void) {
3276 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3277 serverLog(LL_WARNING,"Manual failover timed out.");
3278 resetManualFailover();
3279 }
3280 }
3281
3282 /* This function is called from the cluster cron function in order to go
3283 * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3284 void clusterHandleManualFailover(void) {
3285 /* Return ASAP if no manual failover is in progress. */
3286 if (server.cluster->mf_end == 0) return;
3287
3288 /* If mf_can_start is non-zero, the failover was already triggered so the
3289 * next steps are performed by clusterHandleSlaveFailover(). */
3290 if (server.cluster->mf_can_start) return;
3291
3292 if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3293
3294 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3295 /* Our replication offset matches the master replication offset
3296 * announced after clients were paused. We can start the failover. */
3297 server.cluster->mf_can_start = 1;
3298 serverLog(LL_WARNING,
3299 "All master replication stream processed, "
3300 "manual failover can start.");
3301 }
3302 }
3303
3304 /* -----------------------------------------------------------------------------
3305 * CLUSTER cron job
3306 * -------------------------------------------------------------------------- */
3307
3308 /* This is executed 10 times every second */
clusterCron(void)3309 void clusterCron(void) {
3310 dictIterator *di;
3311 dictEntry *de;
3312 int update_state = 0;
3313 int orphaned_masters; /* How many masters there are without ok slaves. */
3314 int max_slaves; /* Max number of ok slaves for a single master. */
3315 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3316 mstime_t min_pong = 0, now = mstime();
3317 clusterNode *min_pong_node = NULL;
3318 static unsigned long long iteration = 0;
3319 mstime_t handshake_timeout;
3320
3321 iteration++; /* Number of times this function was called so far. */
3322
3323 /* We want to take myself->ip in sync with the cluster-announce-ip option.
3324 * The option can be set at runtime via CONFIG SET, so we periodically check
3325 * if the option changed to reflect this into myself->ip. */
3326 {
3327 static char *prev_ip = NULL;
3328 char *curr_ip = server.cluster_announce_ip;
3329 int changed = 0;
3330
3331 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
3332 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
3333 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
3334
3335 if (changed) {
3336 if (prev_ip) zfree(prev_ip);
3337 prev_ip = curr_ip;
3338
3339 if (curr_ip) {
3340 /* We always take a copy of the previous IP address, by
3341 * duplicating the string. This way later we can check if
3342 * the address really changed. */
3343 prev_ip = zstrdup(prev_ip);
3344 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
3345 myself->ip[NET_IP_STR_LEN-1] = '\0';
3346 } else {
3347 myself->ip[0] = '\0'; /* Force autodetection. */
3348 }
3349 }
3350 }
3351
3352 /* The handshake timeout is the time after which a handshake node that was
3353 * not turned into a normal node is removed from the nodes. Usually it is
3354 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3355 * the value of 1 second. */
3356 handshake_timeout = server.cluster_node_timeout;
3357 if (handshake_timeout < 1000) handshake_timeout = 1000;
3358
3359 /* Update myself flags. */
3360 clusterUpdateMyselfFlags();
3361
3362 /* Check if we have disconnected nodes and re-establish the connection.
3363 * Also update a few stats while we are here, that can be used to make
3364 * better decisions in other part of the code. */
3365 di = dictGetSafeIterator(server.cluster->nodes);
3366 server.cluster->stats_pfail_nodes = 0;
3367 while((de = dictNext(di)) != NULL) {
3368 clusterNode *node = dictGetVal(de);
3369
3370 /* Not interested in reconnecting the link with myself or nodes
3371 * for which we have no address. */
3372 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3373
3374 if (node->flags & CLUSTER_NODE_PFAIL)
3375 server.cluster->stats_pfail_nodes++;
3376
3377 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3378 * configured node timeout. */
3379 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3380 clusterDelNode(node);
3381 continue;
3382 }
3383
3384 if (node->link == NULL) {
3385 int fd;
3386 mstime_t old_ping_sent;
3387 clusterLink *link;
3388
3389 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3390 node->cport, NET_FIRST_BIND_ADDR);
3391 if (fd == -1) {
3392 /* We got a synchronous error from connect before
3393 * clusterSendPing() had a chance to be called.
3394 * If node->ping_sent is zero, failure detection can't work,
3395 * so we claim we actually sent a ping now (that will
3396 * be really sent as soon as the link is obtained). */
3397 if (node->ping_sent == 0) node->ping_sent = mstime();
3398 serverLog(LL_DEBUG, "Unable to connect to "
3399 "Cluster Node [%s]:%d -> %s", node->ip,
3400 node->cport, server.neterr);
3401 continue;
3402 }
3403 link = createClusterLink(node);
3404 link->fd = fd;
3405 node->link = link;
3406 aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3407 clusterReadHandler,link);
3408 /* Queue a PING in the new connection ASAP: this is crucial
3409 * to avoid false positives in failure detection.
3410 *
3411 * If the node is flagged as MEET, we send a MEET message instead
3412 * of a PING one, to force the receiver to add us in its node
3413 * table. */
3414 old_ping_sent = node->ping_sent;
3415 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3416 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3417 if (old_ping_sent) {
3418 /* If there was an active ping before the link was
3419 * disconnected, we want to restore the ping time, otherwise
3420 * replaced by the clusterSendPing() call. */
3421 node->ping_sent = old_ping_sent;
3422 }
3423 /* We can clear the flag after the first packet is sent.
3424 * If we'll never receive a PONG, we'll never send new packets
3425 * to this node. Instead after the PONG is received and we
3426 * are no longer in meet/handshake status, we want to send
3427 * normal PING packets. */
3428 node->flags &= ~CLUSTER_NODE_MEET;
3429
3430 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3431 node->name, node->ip, node->cport);
3432 }
3433 }
3434 dictReleaseIterator(di);
3435
3436 /* Ping some random node 1 time every 10 iterations, so that we usually ping
3437 * one random node every second. */
3438 if (!(iteration % 10)) {
3439 int j;
3440
3441 /* Check a few random nodes and ping the one with the oldest
3442 * pong_received time. */
3443 for (j = 0; j < 5; j++) {
3444 de = dictGetRandomKey(server.cluster->nodes);
3445 clusterNode *this = dictGetVal(de);
3446
3447 /* Don't ping nodes disconnected or with a ping currently active. */
3448 if (this->link == NULL || this->ping_sent != 0) continue;
3449 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3450 continue;
3451 if (min_pong_node == NULL || min_pong > this->pong_received) {
3452 min_pong_node = this;
3453 min_pong = this->pong_received;
3454 }
3455 }
3456 if (min_pong_node) {
3457 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3458 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3459 }
3460 }
3461
3462 /* Iterate nodes to check if we need to flag something as failing.
3463 * This loop is also responsible to:
3464 * 1) Check if there are orphaned masters (masters without non failing
3465 * slaves).
3466 * 2) Count the max number of non failing slaves for a single master.
3467 * 3) Count the number of slaves for our master, if we are a slave. */
3468 orphaned_masters = 0;
3469 max_slaves = 0;
3470 this_slaves = 0;
3471 di = dictGetSafeIterator(server.cluster->nodes);
3472 while((de = dictNext(di)) != NULL) {
3473 clusterNode *node = dictGetVal(de);
3474 now = mstime(); /* Use an updated time at every iteration. */
3475 mstime_t delay;
3476
3477 if (node->flags &
3478 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3479 continue;
3480
3481 /* Orphaned master check, useful only if the current instance
3482 * is a slave that may migrate to another master. */
3483 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3484 int okslaves = clusterCountNonFailingSlaves(node);
3485
3486 /* A master is orphaned if it is serving a non-zero number of
3487 * slots, have no working slaves, but used to have at least one
3488 * slave, or failed over a master that used to have slaves. */
3489 if (okslaves == 0 && node->numslots > 0 &&
3490 node->flags & CLUSTER_NODE_MIGRATE_TO)
3491 {
3492 orphaned_masters++;
3493 }
3494 if (okslaves > max_slaves) max_slaves = okslaves;
3495 if (nodeIsSlave(myself) && myself->slaveof == node)
3496 this_slaves = okslaves;
3497 }
3498
3499 /* If we are waiting for the PONG more than half the cluster
3500 * timeout, reconnect the link: maybe there is a connection
3501 * issue even if the node is alive. */
3502 if (node->link && /* is connected */
3503 now - node->link->ctime >
3504 server.cluster_node_timeout && /* was not already reconnected */
3505 node->ping_sent && /* we already sent a ping */
3506 node->pong_received < node->ping_sent && /* still waiting pong */
3507 /* and we are waiting for the pong more than timeout/2 */
3508 now - node->ping_sent > server.cluster_node_timeout/2)
3509 {
3510 /* Disconnect the link, it will be reconnected automatically. */
3511 freeClusterLink(node->link);
3512 }
3513
3514 /* If we have currently no active ping in this instance, and the
3515 * received PONG is older than half the cluster timeout, send
3516 * a new ping now, to ensure all the nodes are pinged without
3517 * a too big delay. */
3518 if (node->link &&
3519 node->ping_sent == 0 &&
3520 (now - node->pong_received) > server.cluster_node_timeout/2)
3521 {
3522 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3523 continue;
3524 }
3525
3526 /* If we are a master and one of the slaves requested a manual
3527 * failover, ping it continuously. */
3528 if (server.cluster->mf_end &&
3529 nodeIsMaster(myself) &&
3530 server.cluster->mf_slave == node &&
3531 node->link)
3532 {
3533 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3534 continue;
3535 }
3536
3537 /* Check only if we have an active ping for this instance. */
3538 if (node->ping_sent == 0) continue;
3539
3540 /* Compute the delay of the PONG. Note that if we already received
3541 * the PONG, then node->ping_sent is zero, so can't reach this
3542 * code at all. */
3543 delay = now - node->ping_sent;
3544
3545 if (delay > server.cluster_node_timeout) {
3546 /* Timeout reached. Set the node as possibly failing if it is
3547 * not already in this state. */
3548 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3549 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3550 node->name);
3551 node->flags |= CLUSTER_NODE_PFAIL;
3552 update_state = 1;
3553 }
3554 }
3555 }
3556 dictReleaseIterator(di);
3557
3558 /* If we are a slave node but the replication is still turned off,
3559 * enable it if we know the address of our master and it appears to
3560 * be up. */
3561 if (nodeIsSlave(myself) &&
3562 server.masterhost == NULL &&
3563 myself->slaveof &&
3564 nodeHasAddr(myself->slaveof))
3565 {
3566 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3567 }
3568
3569 /* Abourt a manual failover if the timeout is reached. */
3570 manualFailoverCheckTimeout();
3571
3572 if (nodeIsSlave(myself)) {
3573 clusterHandleManualFailover();
3574 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
3575 clusterHandleSlaveFailover();
3576 /* If there are orphaned slaves, and we are a slave among the masters
3577 * with the max number of non-failing slaves, consider migrating to
3578 * the orphaned masters. Note that it does not make sense to try
3579 * a migration if there is no master with at least *two* working
3580 * slaves. */
3581 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3582 clusterHandleSlaveMigration(max_slaves);
3583 }
3584
3585 if (update_state || server.cluster->state == CLUSTER_FAIL)
3586 clusterUpdateState();
3587 }
3588
3589 /* This function is called before the event handler returns to sleep for
3590 * events. It is useful to perform operations that must be done ASAP in
3591 * reaction to events fired but that are not safe to perform inside event
3592 * handlers, or to perform potentially expansive tasks that we need to do
3593 * a single time before replying to clients. */
clusterBeforeSleep(void)3594 void clusterBeforeSleep(void) {
3595 /* Handle failover, this is needed when it is likely that there is already
3596 * the quorum from masters in order to react fast. */
3597 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3598 clusterHandleSlaveFailover();
3599
3600 /* Update the cluster state. */
3601 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3602 clusterUpdateState();
3603
3604 /* Save the config, possibly using fsync. */
3605 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3606 int fsync = server.cluster->todo_before_sleep &
3607 CLUSTER_TODO_FSYNC_CONFIG;
3608 clusterSaveConfigOrDie(fsync);
3609 }
3610
3611 /* Reset our flags (not strictly needed since every single function
3612 * called for flags set should be able to clear its flag). */
3613 server.cluster->todo_before_sleep = 0;
3614 }
3615
clusterDoBeforeSleep(int flags)3616 void clusterDoBeforeSleep(int flags) {
3617 server.cluster->todo_before_sleep |= flags;
3618 }
3619
3620 /* -----------------------------------------------------------------------------
3621 * Slots management
3622 * -------------------------------------------------------------------------- */
3623
3624 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3625 * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3626 int bitmapTestBit(unsigned char *bitmap, int pos) {
3627 off_t byte = pos/8;
3628 int bit = pos&7;
3629 return (bitmap[byte] & (1<<bit)) != 0;
3630 }
3631
3632 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3633 void bitmapSetBit(unsigned char *bitmap, int pos) {
3634 off_t byte = pos/8;
3635 int bit = pos&7;
3636 bitmap[byte] |= 1<<bit;
3637 }
3638
3639 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3640 void bitmapClearBit(unsigned char *bitmap, int pos) {
3641 off_t byte = pos/8;
3642 int bit = pos&7;
3643 bitmap[byte] &= ~(1<<bit);
3644 }
3645
3646 /* Return non-zero if there is at least one master with slaves in the cluster.
3647 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3648 * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3649 int clusterMastersHaveSlaves(void) {
3650 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3651 dictEntry *de;
3652 int slaves = 0;
3653 while((de = dictNext(di)) != NULL) {
3654 clusterNode *node = dictGetVal(de);
3655
3656 if (nodeIsSlave(node)) continue;
3657 slaves += node->numslaves;
3658 }
3659 dictReleaseIterator(di);
3660 return slaves != 0;
3661 }
3662
3663 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3664 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3665 int old = bitmapTestBit(n->slots,slot);
3666 bitmapSetBit(n->slots,slot);
3667 if (!old) {
3668 n->numslots++;
3669 /* When a master gets its first slot, even if it has no slaves,
3670 * it gets flagged with MIGRATE_TO, that is, the master is a valid
3671 * target for replicas migration, if and only if at least one of
3672 * the other masters has slaves right now.
3673 *
3674 * Normally masters are valid targerts of replica migration if:
3675 * 1. The used to have slaves (but no longer have).
3676 * 2. They are slaves failing over a master that used to have slaves.
3677 *
3678 * However new masters with slots assigned are considered valid
3679 * migration tagets if the rest of the cluster is not a slave-less.
3680 *
3681 * See https://github.com/antirez/redis/issues/3043 for more info. */
3682 if (n->numslots == 1 && clusterMastersHaveSlaves())
3683 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3684 }
3685 return old;
3686 }
3687
3688 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3689 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3690 int old = bitmapTestBit(n->slots,slot);
3691 bitmapClearBit(n->slots,slot);
3692 if (old) n->numslots--;
3693 return old;
3694 }
3695
3696 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3697 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3698 return bitmapTestBit(n->slots,slot);
3699 }
3700
3701 /* Add the specified slot to the list of slots that node 'n' will
3702 * serve. Return C_OK if the operation ended with success.
3703 * If the slot is already assigned to another instance this is considered
3704 * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3705 int clusterAddSlot(clusterNode *n, int slot) {
3706 if (server.cluster->slots[slot]) return C_ERR;
3707 clusterNodeSetSlotBit(n,slot);
3708 server.cluster->slots[slot] = n;
3709 return C_OK;
3710 }
3711
3712 /* Delete the specified slot marking it as unassigned.
3713 * Returns C_OK if the slot was assigned, otherwise if the slot was
3714 * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3715 int clusterDelSlot(int slot) {
3716 clusterNode *n = server.cluster->slots[slot];
3717
3718 if (!n) return C_ERR;
3719 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3720 server.cluster->slots[slot] = NULL;
3721 return C_OK;
3722 }
3723
3724 /* Delete all the slots associated with the specified node.
3725 * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3726 int clusterDelNodeSlots(clusterNode *node) {
3727 int deleted = 0, j;
3728
3729 for (j = 0; j < CLUSTER_SLOTS; j++) {
3730 if (clusterNodeGetSlotBit(node,j)) {
3731 clusterDelSlot(j);
3732 deleted++;
3733 }
3734 }
3735 return deleted;
3736 }
3737
3738 /* Clear the migrating / importing state for all the slots.
3739 * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3740 void clusterCloseAllSlots(void) {
3741 memset(server.cluster->migrating_slots_to,0,
3742 sizeof(server.cluster->migrating_slots_to));
3743 memset(server.cluster->importing_slots_from,0,
3744 sizeof(server.cluster->importing_slots_from));
3745 }
3746
3747 /* -----------------------------------------------------------------------------
3748 * Cluster state evaluation function
3749 * -------------------------------------------------------------------------- */
3750
3751 /* The following are defines that are only used in the evaluation function
3752 * and are based on heuristics. Actually the main point about the rejoin and
3753 * writable delay is that they should be a few orders of magnitude larger
3754 * than the network latency. */
3755 #define CLUSTER_MAX_REJOIN_DELAY 5000
3756 #define CLUSTER_MIN_REJOIN_DELAY 500
3757 #define CLUSTER_WRITABLE_DELAY 2000
3758
clusterUpdateState(void)3759 void clusterUpdateState(void) {
3760 int j, new_state;
3761 int reachable_masters = 0;
3762 static mstime_t among_minority_time;
3763 static mstime_t first_call_time = 0;
3764
3765 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3766
3767 /* If this is a master node, wait some time before turning the state
3768 * into OK, since it is not a good idea to rejoin the cluster as a writable
3769 * master, after a reboot, without giving the cluster a chance to
3770 * reconfigure this node. Note that the delay is calculated starting from
3771 * the first call to this function and not since the server start, in order
3772 * to don't count the DB loading time. */
3773 if (first_call_time == 0) first_call_time = mstime();
3774 if (nodeIsMaster(myself) &&
3775 server.cluster->state == CLUSTER_FAIL &&
3776 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3777
3778 /* Start assuming the state is OK. We'll turn it into FAIL if there
3779 * are the right conditions. */
3780 new_state = CLUSTER_OK;
3781
3782 /* Check if all the slots are covered. */
3783 if (server.cluster_require_full_coverage) {
3784 for (j = 0; j < CLUSTER_SLOTS; j++) {
3785 if (server.cluster->slots[j] == NULL ||
3786 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3787 {
3788 new_state = CLUSTER_FAIL;
3789 break;
3790 }
3791 }
3792 }
3793
3794 /* Compute the cluster size, that is the number of master nodes
3795 * serving at least a single slot.
3796 *
3797 * At the same time count the number of reachable masters having
3798 * at least one slot. */
3799 {
3800 dictIterator *di;
3801 dictEntry *de;
3802
3803 server.cluster->size = 0;
3804 di = dictGetSafeIterator(server.cluster->nodes);
3805 while((de = dictNext(di)) != NULL) {
3806 clusterNode *node = dictGetVal(de);
3807
3808 if (nodeIsMaster(node) && node->numslots) {
3809 server.cluster->size++;
3810 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3811 reachable_masters++;
3812 }
3813 }
3814 dictReleaseIterator(di);
3815 }
3816
3817 /* If we are in a minority partition, change the cluster state
3818 * to FAIL. */
3819 {
3820 int needed_quorum = (server.cluster->size / 2) + 1;
3821
3822 if (reachable_masters < needed_quorum) {
3823 new_state = CLUSTER_FAIL;
3824 among_minority_time = mstime();
3825 }
3826 }
3827
3828 /* Log a state change */
3829 if (new_state != server.cluster->state) {
3830 mstime_t rejoin_delay = server.cluster_node_timeout;
3831
3832 /* If the instance is a master and was partitioned away with the
3833 * minority, don't let it accept queries for some time after the
3834 * partition heals, to make sure there is enough time to receive
3835 * a configuration update. */
3836 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3837 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3838 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3839 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3840
3841 if (new_state == CLUSTER_OK &&
3842 nodeIsMaster(myself) &&
3843 mstime() - among_minority_time < rejoin_delay)
3844 {
3845 return;
3846 }
3847
3848 /* Change the state and log the event. */
3849 serverLog(LL_WARNING,"Cluster state changed: %s",
3850 new_state == CLUSTER_OK ? "ok" : "fail");
3851 server.cluster->state = new_state;
3852 }
3853 }
3854
3855 /* This function is called after the node startup in order to verify that data
3856 * loaded from disk is in agreement with the cluster configuration:
3857 *
3858 * 1) If we find keys about hash slots we have no responsibility for, the
3859 * following happens:
3860 * A) If no other node is in charge according to the current cluster
3861 * configuration, we add these slots to our node.
3862 * B) If according to our config other nodes are already in charge for
3863 * this lots, we set the slots as IMPORTING from our point of view
3864 * in order to justify we have those slots, and in order to make
3865 * redis-trib aware of the issue, so that it can try to fix it.
3866 * 2) If we find data in a DB different than DB0 we return C_ERR to
3867 * signal the caller it should quit the server with an error message
3868 * or take other actions.
3869 *
3870 * The function always returns C_OK even if it will try to correct
3871 * the error described in "1". However if data is found in DB different
3872 * from DB0, C_ERR is returned.
3873 *
3874 * The function also uses the logging facility in order to warn the user
3875 * about desynchronizations between the data we have in memory and the
3876 * cluster configuration. */
verifyClusterConfigWithData(void)3877 int verifyClusterConfigWithData(void) {
3878 int j;
3879 int update_config = 0;
3880
3881 /* Return ASAP if a module disabled cluster redirections. In that case
3882 * every master can store keys about every possible hash slot. */
3883 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
3884 return C_OK;
3885
3886 /* If this node is a slave, don't perform the check at all as we
3887 * completely depend on the replication stream. */
3888 if (nodeIsSlave(myself)) return C_OK;
3889
3890 /* Make sure we only have keys in DB0. */
3891 for (j = 1; j < server.dbnum; j++) {
3892 if (dictSize(server.db[j].dict)) return C_ERR;
3893 }
3894
3895 /* Check that all the slots we see populated memory have a corresponding
3896 * entry in the cluster table. Otherwise fix the table. */
3897 for (j = 0; j < CLUSTER_SLOTS; j++) {
3898 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3899 /* Check if we are assigned to this slot or if we are importing it.
3900 * In both cases check the next slot as the configuration makes
3901 * sense. */
3902 if (server.cluster->slots[j] == myself ||
3903 server.cluster->importing_slots_from[j] != NULL) continue;
3904
3905 /* If we are here data and cluster config don't agree, and we have
3906 * slot 'j' populated even if we are not importing it, nor we are
3907 * assigned to this slot. Fix this condition. */
3908
3909 update_config++;
3910 /* Case A: slot is unassigned. Take responsibility for it. */
3911 if (server.cluster->slots[j] == NULL) {
3912 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3913 "Taking responsibility for it.",j);
3914 clusterAddSlot(myself,j);
3915 } else {
3916 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3917 "assigned to another node. "
3918 "Setting it to importing state.",j);
3919 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3920 }
3921 }
3922 if (update_config) clusterSaveConfigOrDie(1);
3923 return C_OK;
3924 }
3925
3926 /* -----------------------------------------------------------------------------
3927 * SLAVE nodes handling
3928 * -------------------------------------------------------------------------- */
3929
3930 /* Set the specified node 'n' as master for this node.
3931 * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3932 void clusterSetMaster(clusterNode *n) {
3933 serverAssert(n != myself);
3934 serverAssert(myself->numslots == 0);
3935
3936 if (nodeIsMaster(myself)) {
3937 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3938 myself->flags |= CLUSTER_NODE_SLAVE;
3939 clusterCloseAllSlots();
3940 } else {
3941 if (myself->slaveof)
3942 clusterNodeRemoveSlave(myself->slaveof,myself);
3943 }
3944 myself->slaveof = n;
3945 clusterNodeAddSlave(n,myself);
3946 replicationSetMaster(n->ip, n->port);
3947 resetManualFailover();
3948 }
3949
3950 /* -----------------------------------------------------------------------------
3951 * Nodes to string representation functions.
3952 * -------------------------------------------------------------------------- */
3953
3954 struct redisNodeFlags {
3955 uint16_t flag;
3956 char *name;
3957 };
3958
3959 static struct redisNodeFlags redisNodeFlagsTable[] = {
3960 {CLUSTER_NODE_MYSELF, "myself,"},
3961 {CLUSTER_NODE_MASTER, "master,"},
3962 {CLUSTER_NODE_SLAVE, "slave,"},
3963 {CLUSTER_NODE_PFAIL, "fail?,"},
3964 {CLUSTER_NODE_FAIL, "fail,"},
3965 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
3966 {CLUSTER_NODE_NOADDR, "noaddr,"},
3967 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
3968 };
3969
3970 /* Concatenate the comma separated list of node flags to the given SDS
3971 * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)3972 sds representClusterNodeFlags(sds ci, uint16_t flags) {
3973 size_t orig_len = sdslen(ci);
3974 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
3975 for (i = 0; i < size; i++) {
3976 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
3977 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
3978 }
3979 /* If no flag was added, add the "noflags" special flag. */
3980 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
3981 sdsIncrLen(ci,-1); /* Remove trailing comma. */
3982 return ci;
3983 }
3984
3985 /* Generate a csv-alike representation of the specified cluster node.
3986 * See clusterGenNodesDescription() top comment for more information.
3987 *
3988 * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)3989 sds clusterGenNodeDescription(clusterNode *node) {
3990 int j, start;
3991 sds ci;
3992
3993 /* Node coordinates */
3994 ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
3995 node->name,
3996 node->ip,
3997 node->port,
3998 node->cport);
3999
4000 /* Flags */
4001 ci = representClusterNodeFlags(ci, node->flags);
4002
4003 /* Slave of... or just "-" */
4004 if (node->slaveof)
4005 ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
4006 else
4007 ci = sdscatlen(ci," - ",3);
4008
4009 /* Latency from the POV of this node, config epoch, link status */
4010 ci = sdscatprintf(ci,"%lld %lld %llu %s",
4011 (long long) node->ping_sent,
4012 (long long) node->pong_received,
4013 (unsigned long long) node->configEpoch,
4014 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
4015 "connected" : "disconnected");
4016
4017 /* Slots served by this instance */
4018 start = -1;
4019 for (j = 0; j < CLUSTER_SLOTS; j++) {
4020 int bit;
4021
4022 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4023 if (start == -1) start = j;
4024 }
4025 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4026 if (bit && j == CLUSTER_SLOTS-1) j++;
4027
4028 if (start == j-1) {
4029 ci = sdscatprintf(ci," %d",start);
4030 } else {
4031 ci = sdscatprintf(ci," %d-%d",start,j-1);
4032 }
4033 start = -1;
4034 }
4035 }
4036
4037 /* Just for MYSELF node we also dump info about slots that
4038 * we are migrating to other instances or importing from other
4039 * instances. */
4040 if (node->flags & CLUSTER_NODE_MYSELF) {
4041 for (j = 0; j < CLUSTER_SLOTS; j++) {
4042 if (server.cluster->migrating_slots_to[j]) {
4043 ci = sdscatprintf(ci," [%d->-%.40s]",j,
4044 server.cluster->migrating_slots_to[j]->name);
4045 } else if (server.cluster->importing_slots_from[j]) {
4046 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
4047 server.cluster->importing_slots_from[j]->name);
4048 }
4049 }
4050 }
4051 return ci;
4052 }
4053
4054 /* Generate a csv-alike representation of the nodes we are aware of,
4055 * including the "myself" node, and return an SDS string containing the
4056 * representation (it is up to the caller to free it).
4057 *
4058 * All the nodes matching at least one of the node flags specified in
4059 * "filter" are excluded from the output, so using zero as a filter will
4060 * include all the known nodes in the representation, including nodes in
4061 * the HANDSHAKE state.
4062 *
4063 * The representation obtained using this function is used for the output
4064 * of the CLUSTER NODES function, and as format for the cluster
4065 * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)4066 sds clusterGenNodesDescription(int filter) {
4067 sds ci = sdsempty(), ni;
4068 dictIterator *di;
4069 dictEntry *de;
4070
4071 di = dictGetSafeIterator(server.cluster->nodes);
4072 while((de = dictNext(di)) != NULL) {
4073 clusterNode *node = dictGetVal(de);
4074
4075 if (node->flags & filter) continue;
4076 ni = clusterGenNodeDescription(node);
4077 ci = sdscatsds(ci,ni);
4078 sdsfree(ni);
4079 ci = sdscatlen(ci,"\n",1);
4080 }
4081 dictReleaseIterator(di);
4082 return ci;
4083 }
4084
4085 /* -----------------------------------------------------------------------------
4086 * CLUSTER command
4087 * -------------------------------------------------------------------------- */
4088
clusterGetMessageTypeString(int type)4089 const char *clusterGetMessageTypeString(int type) {
4090 switch(type) {
4091 case CLUSTERMSG_TYPE_PING: return "ping";
4092 case CLUSTERMSG_TYPE_PONG: return "pong";
4093 case CLUSTERMSG_TYPE_MEET: return "meet";
4094 case CLUSTERMSG_TYPE_FAIL: return "fail";
4095 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
4096 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
4097 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
4098 case CLUSTERMSG_TYPE_UPDATE: return "update";
4099 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
4100 case CLUSTERMSG_TYPE_MODULE: return "module";
4101 }
4102 return "unknown";
4103 }
4104
getSlotOrReply(client * c,robj * o)4105 int getSlotOrReply(client *c, robj *o) {
4106 long long slot;
4107
4108 if (getLongLongFromObject(o,&slot) != C_OK ||
4109 slot < 0 || slot >= CLUSTER_SLOTS)
4110 {
4111 addReplyError(c,"Invalid or out of range slot");
4112 return -1;
4113 }
4114 return (int) slot;
4115 }
4116
clusterReplyMultiBulkSlots(client * c)4117 void clusterReplyMultiBulkSlots(client *c) {
4118 /* Format: 1) 1) start slot
4119 * 2) end slot
4120 * 3) 1) master IP
4121 * 2) master port
4122 * 3) node ID
4123 * 4) 1) replica IP
4124 * 2) replica port
4125 * 3) node ID
4126 * ... continued until done
4127 */
4128
4129 int num_masters = 0;
4130 void *slot_replylen = addDeferredMultiBulkLength(c);
4131
4132 dictEntry *de;
4133 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
4134 while((de = dictNext(di)) != NULL) {
4135 clusterNode *node = dictGetVal(de);
4136 int j = 0, start = -1;
4137
4138 /* Skip slaves (that are iterated when producing the output of their
4139 * master) and masters not serving any slot. */
4140 if (!nodeIsMaster(node) || node->numslots == 0) continue;
4141
4142 for (j = 0; j < CLUSTER_SLOTS; j++) {
4143 int bit, i;
4144
4145 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
4146 if (start == -1) start = j;
4147 }
4148 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
4149 int nested_elements = 3; /* slots (2) + master addr (1). */
4150 void *nested_replylen = addDeferredMultiBulkLength(c);
4151
4152 if (bit && j == CLUSTER_SLOTS-1) j++;
4153
4154 /* If slot exists in output map, add to it's list.
4155 * else, create a new output map for this slot */
4156 if (start == j-1) {
4157 addReplyLongLong(c, start); /* only one slot; low==high */
4158 addReplyLongLong(c, start);
4159 } else {
4160 addReplyLongLong(c, start); /* low */
4161 addReplyLongLong(c, j-1); /* high */
4162 }
4163 start = -1;
4164
4165 /* First node reply position is always the master */
4166 addReplyMultiBulkLen(c, 3);
4167 addReplyBulkCString(c, node->ip);
4168 addReplyLongLong(c, node->port);
4169 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
4170
4171 /* Remaining nodes in reply are replicas for slot range */
4172 for (i = 0; i < node->numslaves; i++) {
4173 /* This loop is copy/pasted from clusterGenNodeDescription()
4174 * with modifications for per-slot node aggregation */
4175 if (nodeFailed(node->slaves[i])) continue;
4176 addReplyMultiBulkLen(c, 3);
4177 addReplyBulkCString(c, node->slaves[i]->ip);
4178 addReplyLongLong(c, node->slaves[i]->port);
4179 addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
4180 nested_elements++;
4181 }
4182 setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
4183 num_masters++;
4184 }
4185 }
4186 }
4187 dictReleaseIterator(di);
4188 setDeferredMultiBulkLength(c, slot_replylen, num_masters);
4189 }
4190
clusterCommand(client * c)4191 void clusterCommand(client *c) {
4192 if (server.cluster_enabled == 0) {
4193 addReplyError(c,"This instance has cluster support disabled");
4194 return;
4195 }
4196
4197 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4198 const char *help[] = {
4199 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
4200 "BUMPEPOCH -- Advance the cluster config epoch.",
4201 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
4202 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
4203 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
4204 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
4205 "FORGET <node-id> -- Remove a node from the cluster.",
4206 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
4207 "FLUSHSLOTS -- Delete current node own slots information.",
4208 "INFO - Return onformation about the cluster.",
4209 "KEYSLOT <key> -- Return the hash slot for <key>.",
4210 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
4211 "MYID -- Return the node id.",
4212 "NODES -- Return cluster configuration seen by node. Output format:",
4213 " <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
4214 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
4215 "RESET [hard|soft] -- Reset current node (default: soft).",
4216 "SET-config-epoch <epoch> - Set config epoch of current node.",
4217 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
4218 "REPLICAS <node-id> -- Return <node-id> replicas.",
4219 "SLOTS -- Return information about slots range mappings. Each range is made of:",
4220 " start, end, master and replicas IP addresses, ports and ids",
4221 NULL
4222 };
4223 addReplyHelp(c, help);
4224 } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
4225 /* CLUSTER MEET <ip> <port> [cport] */
4226 long long port, cport;
4227
4228 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
4229 addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
4230 (char*)c->argv[3]->ptr);
4231 return;
4232 }
4233
4234 if (c->argc == 5) {
4235 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
4236 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
4237 (char*)c->argv[4]->ptr);
4238 return;
4239 }
4240 } else {
4241 cport = port + CLUSTER_PORT_INCR;
4242 }
4243
4244 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
4245 errno == EINVAL)
4246 {
4247 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
4248 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
4249 } else {
4250 addReply(c,shared.ok);
4251 }
4252 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
4253 /* CLUSTER NODES */
4254 robj *o;
4255 sds ci = clusterGenNodesDescription(0);
4256
4257 o = createObject(OBJ_STRING,ci);
4258 addReplyBulk(c,o);
4259 decrRefCount(o);
4260 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
4261 /* CLUSTER MYID */
4262 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
4263 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
4264 /* CLUSTER SLOTS */
4265 clusterReplyMultiBulkSlots(c);
4266 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
4267 /* CLUSTER FLUSHSLOTS */
4268 if (dictSize(server.db[0].dict) != 0) {
4269 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
4270 return;
4271 }
4272 clusterDelNodeSlots(myself);
4273 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4274 addReply(c,shared.ok);
4275 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
4276 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
4277 {
4278 /* CLUSTER ADDSLOTS <slot> [slot] ... */
4279 /* CLUSTER DELSLOTS <slot> [slot] ... */
4280 int j, slot;
4281 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
4282 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
4283
4284 memset(slots,0,CLUSTER_SLOTS);
4285 /* Check that all the arguments are parseable and that all the
4286 * slots are not already busy. */
4287 for (j = 2; j < c->argc; j++) {
4288 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
4289 zfree(slots);
4290 return;
4291 }
4292 if (del && server.cluster->slots[slot] == NULL) {
4293 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
4294 zfree(slots);
4295 return;
4296 } else if (!del && server.cluster->slots[slot]) {
4297 addReplyErrorFormat(c,"Slot %d is already busy", slot);
4298 zfree(slots);
4299 return;
4300 }
4301 if (slots[slot]++ == 1) {
4302 addReplyErrorFormat(c,"Slot %d specified multiple times",
4303 (int)slot);
4304 zfree(slots);
4305 return;
4306 }
4307 }
4308 for (j = 0; j < CLUSTER_SLOTS; j++) {
4309 if (slots[j]) {
4310 int retval;
4311
4312 /* If this slot was set as importing we can clear this
4313 * state as now we are the real owner of the slot. */
4314 if (server.cluster->importing_slots_from[j])
4315 server.cluster->importing_slots_from[j] = NULL;
4316
4317 retval = del ? clusterDelSlot(j) :
4318 clusterAddSlot(myself,j);
4319 serverAssertWithInfo(c,NULL,retval == C_OK);
4320 }
4321 }
4322 zfree(slots);
4323 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4324 addReply(c,shared.ok);
4325 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
4326 /* SETSLOT 10 MIGRATING <node ID> */
4327 /* SETSLOT 10 IMPORTING <node ID> */
4328 /* SETSLOT 10 STABLE */
4329 /* SETSLOT 10 NODE <node ID> */
4330 int slot;
4331 clusterNode *n;
4332
4333 if (nodeIsSlave(myself)) {
4334 addReplyError(c,"Please use SETSLOT only with masters.");
4335 return;
4336 }
4337
4338 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
4339
4340 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
4341 if (server.cluster->slots[slot] != myself) {
4342 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
4343 return;
4344 }
4345 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4346 addReplyErrorFormat(c,"I don't know about node %s",
4347 (char*)c->argv[4]->ptr);
4348 return;
4349 }
4350 server.cluster->migrating_slots_to[slot] = n;
4351 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4352 if (server.cluster->slots[slot] == myself) {
4353 addReplyErrorFormat(c,
4354 "I'm already the owner of hash slot %u",slot);
4355 return;
4356 }
4357 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4358 addReplyErrorFormat(c,"I don't know about node %s",
4359 (char*)c->argv[4]->ptr);
4360 return;
4361 }
4362 server.cluster->importing_slots_from[slot] = n;
4363 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4364 /* CLUSTER SETSLOT <SLOT> STABLE */
4365 server.cluster->importing_slots_from[slot] = NULL;
4366 server.cluster->migrating_slots_to[slot] = NULL;
4367 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4368 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4369 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4370
4371 if (!n) {
4372 addReplyErrorFormat(c,"Unknown node %s",
4373 (char*)c->argv[4]->ptr);
4374 return;
4375 }
4376 /* If this hash slot was served by 'myself' before to switch
4377 * make sure there are no longer local keys for this hash slot. */
4378 if (server.cluster->slots[slot] == myself && n != myself) {
4379 if (countKeysInSlot(slot) != 0) {
4380 addReplyErrorFormat(c,
4381 "Can't assign hashslot %d to a different node "
4382 "while I still hold keys for this hash slot.", slot);
4383 return;
4384 }
4385 }
4386 /* If this slot is in migrating status but we have no keys
4387 * for it assigning the slot to another node will clear
4388 * the migratig status. */
4389 if (countKeysInSlot(slot) == 0 &&
4390 server.cluster->migrating_slots_to[slot])
4391 server.cluster->migrating_slots_to[slot] = NULL;
4392
4393 /* If this node was importing this slot, assigning the slot to
4394 * itself also clears the importing status. */
4395 if (n == myself &&
4396 server.cluster->importing_slots_from[slot])
4397 {
4398 /* This slot was manually migrated, set this node configEpoch
4399 * to a new epoch so that the new version can be propagated
4400 * by the cluster.
4401 *
4402 * Note that if this ever results in a collision with another
4403 * node getting the same configEpoch, for example because a
4404 * failover happens at the same time we close the slot, the
4405 * configEpoch collision resolution will fix it assigning
4406 * a different epoch to each node. */
4407 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4408 serverLog(LL_WARNING,
4409 "configEpoch updated after importing slot %d", slot);
4410 }
4411 server.cluster->importing_slots_from[slot] = NULL;
4412 }
4413 clusterDelSlot(slot);
4414 clusterAddSlot(n,slot);
4415 } else {
4416 addReplyError(c,
4417 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
4418 return;
4419 }
4420 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4421 addReply(c,shared.ok);
4422 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4423 /* CLUSTER BUMPEPOCH */
4424 int retval = clusterBumpConfigEpochWithoutConsensus();
4425 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4426 (retval == C_OK) ? "BUMPED" : "STILL",
4427 (unsigned long long) myself->configEpoch);
4428 addReplySds(c,reply);
4429 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4430 /* CLUSTER INFO */
4431 char *statestr[] = {"ok","fail","needhelp"};
4432 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4433 uint64_t myepoch;
4434 int j;
4435
4436 for (j = 0; j < CLUSTER_SLOTS; j++) {
4437 clusterNode *n = server.cluster->slots[j];
4438
4439 if (n == NULL) continue;
4440 slots_assigned++;
4441 if (nodeFailed(n)) {
4442 slots_fail++;
4443 } else if (nodeTimedOut(n)) {
4444 slots_pfail++;
4445 } else {
4446 slots_ok++;
4447 }
4448 }
4449
4450 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4451 myself->slaveof->configEpoch : myself->configEpoch;
4452
4453 sds info = sdscatprintf(sdsempty(),
4454 "cluster_state:%s\r\n"
4455 "cluster_slots_assigned:%d\r\n"
4456 "cluster_slots_ok:%d\r\n"
4457 "cluster_slots_pfail:%d\r\n"
4458 "cluster_slots_fail:%d\r\n"
4459 "cluster_known_nodes:%lu\r\n"
4460 "cluster_size:%d\r\n"
4461 "cluster_current_epoch:%llu\r\n"
4462 "cluster_my_epoch:%llu\r\n"
4463 , statestr[server.cluster->state],
4464 slots_assigned,
4465 slots_ok,
4466 slots_pfail,
4467 slots_fail,
4468 dictSize(server.cluster->nodes),
4469 server.cluster->size,
4470 (unsigned long long) server.cluster->currentEpoch,
4471 (unsigned long long) myepoch
4472 );
4473
4474 /* Show stats about messages sent and received. */
4475 long long tot_msg_sent = 0;
4476 long long tot_msg_received = 0;
4477
4478 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4479 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
4480 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
4481 info = sdscatprintf(info,
4482 "cluster_stats_messages_%s_sent:%lld\r\n",
4483 clusterGetMessageTypeString(i),
4484 server.cluster->stats_bus_messages_sent[i]);
4485 }
4486 info = sdscatprintf(info,
4487 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
4488
4489 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
4490 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
4491 tot_msg_received += server.cluster->stats_bus_messages_received[i];
4492 info = sdscatprintf(info,
4493 "cluster_stats_messages_%s_received:%lld\r\n",
4494 clusterGetMessageTypeString(i),
4495 server.cluster->stats_bus_messages_received[i]);
4496 }
4497 info = sdscatprintf(info,
4498 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
4499
4500 /* Produce the reply protocol. */
4501 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4502 (unsigned long)sdslen(info)));
4503 addReplySds(c,info);
4504 addReply(c,shared.crlf);
4505 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4506 int retval = clusterSaveConfig(1);
4507
4508 if (retval == 0)
4509 addReply(c,shared.ok);
4510 else
4511 addReplyErrorFormat(c,"error saving the cluster node config: %s",
4512 strerror(errno));
4513 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4514 /* CLUSTER KEYSLOT <key> */
4515 sds key = c->argv[2]->ptr;
4516
4517 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4518 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4519 /* CLUSTER COUNTKEYSINSLOT <slot> */
4520 long long slot;
4521
4522 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4523 return;
4524 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4525 addReplyError(c,"Invalid slot");
4526 return;
4527 }
4528 addReplyLongLong(c,countKeysInSlot(slot));
4529 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4530 /* CLUSTER GETKEYSINSLOT <slot> <count> */
4531 long long maxkeys, slot;
4532 unsigned int numkeys, j;
4533 robj **keys;
4534
4535 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4536 return;
4537 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4538 != C_OK)
4539 return;
4540 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4541 addReplyError(c,"Invalid slot or number of keys");
4542 return;
4543 }
4544
4545 /* Avoid allocating more than needed in case of large COUNT argument
4546 * and smaller actual number of keys. */
4547 unsigned int keys_in_slot = countKeysInSlot(slot);
4548 if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
4549
4550 keys = zmalloc(sizeof(robj*)*maxkeys);
4551 numkeys = getKeysInSlot(slot, keys, maxkeys);
4552 addReplyMultiBulkLen(c,numkeys);
4553 for (j = 0; j < numkeys; j++) {
4554 addReplyBulk(c,keys[j]);
4555 decrRefCount(keys[j]);
4556 }
4557 zfree(keys);
4558 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4559 /* CLUSTER FORGET <NODE ID> */
4560 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4561
4562 if (!n) {
4563 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4564 return;
4565 } else if (n == myself) {
4566 addReplyError(c,"I tried hard but I can't forget myself...");
4567 return;
4568 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4569 addReplyError(c,"Can't forget my master!");
4570 return;
4571 }
4572 clusterBlacklistAddNode(n);
4573 clusterDelNode(n);
4574 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4575 CLUSTER_TODO_SAVE_CONFIG);
4576 addReply(c,shared.ok);
4577 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4578 /* CLUSTER REPLICATE <NODE ID> */
4579 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4580
4581 /* Lookup the specified node in our table. */
4582 if (!n) {
4583 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4584 return;
4585 }
4586
4587 /* I can't replicate myself. */
4588 if (n == myself) {
4589 addReplyError(c,"Can't replicate myself");
4590 return;
4591 }
4592
4593 /* Can't replicate a slave. */
4594 if (nodeIsSlave(n)) {
4595 addReplyError(c,"I can only replicate a master, not a replica.");
4596 return;
4597 }
4598
4599 /* If the instance is currently a master, it should have no assigned
4600 * slots nor keys to accept to replicate some other node.
4601 * Slaves can switch to another master without issues. */
4602 if (nodeIsMaster(myself) &&
4603 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4604 addReplyError(c,
4605 "To set a master the node must be empty and "
4606 "without assigned slots.");
4607 return;
4608 }
4609
4610 /* Set the master. */
4611 clusterSetMaster(n);
4612 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4613 addReply(c,shared.ok);
4614 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
4615 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
4616 /* CLUSTER SLAVES <NODE ID> */
4617 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4618 int j;
4619
4620 /* Lookup the specified node in our table. */
4621 if (!n) {
4622 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4623 return;
4624 }
4625
4626 if (nodeIsSlave(n)) {
4627 addReplyError(c,"The specified node is not a master");
4628 return;
4629 }
4630
4631 addReplyMultiBulkLen(c,n->numslaves);
4632 for (j = 0; j < n->numslaves; j++) {
4633 sds ni = clusterGenNodeDescription(n->slaves[j]);
4634 addReplyBulkCString(c,ni);
4635 sdsfree(ni);
4636 }
4637 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4638 c->argc == 3)
4639 {
4640 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4641 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4642
4643 if (!n) {
4644 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4645 return;
4646 } else {
4647 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4648 }
4649 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4650 (c->argc == 2 || c->argc == 3))
4651 {
4652 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4653 int force = 0, takeover = 0;
4654
4655 if (c->argc == 3) {
4656 if (!strcasecmp(c->argv[2]->ptr,"force")) {
4657 force = 1;
4658 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4659 takeover = 1;
4660 force = 1; /* Takeover also implies force. */
4661 } else {
4662 addReply(c,shared.syntaxerr);
4663 return;
4664 }
4665 }
4666
4667 /* Check preconditions. */
4668 if (nodeIsMaster(myself)) {
4669 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
4670 return;
4671 } else if (myself->slaveof == NULL) {
4672 addReplyError(c,"I'm a replica but my master is unknown to me");
4673 return;
4674 } else if (!force &&
4675 (nodeFailed(myself->slaveof) ||
4676 myself->slaveof->link == NULL))
4677 {
4678 addReplyError(c,"Master is down or failed, "
4679 "please use CLUSTER FAILOVER FORCE");
4680 return;
4681 }
4682 resetManualFailover();
4683 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4684
4685 if (takeover) {
4686 /* A takeover does not perform any initial check. It just
4687 * generates a new configuration epoch for this node without
4688 * consensus, claims the master's slots, and broadcast the new
4689 * configuration. */
4690 serverLog(LL_WARNING,"Taking over the master (user request).");
4691 clusterBumpConfigEpochWithoutConsensus();
4692 clusterFailoverReplaceYourMaster();
4693 } else if (force) {
4694 /* If this is a forced failover, we don't need to talk with our
4695 * master to agree about the offset. We just failover taking over
4696 * it without coordination. */
4697 serverLog(LL_WARNING,"Forced failover user request accepted.");
4698 server.cluster->mf_can_start = 1;
4699 } else {
4700 serverLog(LL_WARNING,"Manual failover user request accepted.");
4701 clusterSendMFStart(myself->slaveof);
4702 }
4703 addReply(c,shared.ok);
4704 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4705 {
4706 /* CLUSTER SET-CONFIG-EPOCH <epoch>
4707 *
4708 * The user is allowed to set the config epoch only when a node is
4709 * totally fresh: no config epoch, no other known node, and so forth.
4710 * This happens at cluster creation time to start with a cluster where
4711 * every node has a different node ID, without to rely on the conflicts
4712 * resolution system which is too slow when a big cluster is created. */
4713 long long epoch;
4714
4715 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4716 return;
4717
4718 if (epoch < 0) {
4719 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4720 } else if (dictSize(server.cluster->nodes) > 1) {
4721 addReplyError(c,"The user can assign a config epoch only when the "
4722 "node does not know any other node.");
4723 } else if (myself->configEpoch != 0) {
4724 addReplyError(c,"Node config epoch is already non-zero");
4725 } else {
4726 myself->configEpoch = epoch;
4727 serverLog(LL_WARNING,
4728 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4729 (unsigned long long) myself->configEpoch);
4730
4731 if (server.cluster->currentEpoch < (uint64_t)epoch)
4732 server.cluster->currentEpoch = epoch;
4733 /* No need to fsync the config here since in the unlucky event
4734 * of a failure to persist the config, the conflict resolution code
4735 * will assign an unique config to this node. */
4736 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4737 CLUSTER_TODO_SAVE_CONFIG);
4738 addReply(c,shared.ok);
4739 }
4740 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4741 (c->argc == 2 || c->argc == 3))
4742 {
4743 /* CLUSTER RESET [SOFT|HARD] */
4744 int hard = 0;
4745
4746 /* Parse soft/hard argument. Default is soft. */
4747 if (c->argc == 3) {
4748 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4749 hard = 1;
4750 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4751 hard = 0;
4752 } else {
4753 addReply(c,shared.syntaxerr);
4754 return;
4755 }
4756 }
4757
4758 /* Slaves can be reset while containing data, but not master nodes
4759 * that must be empty. */
4760 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4761 addReplyError(c,"CLUSTER RESET can't be called with "
4762 "master nodes containing keys");
4763 return;
4764 }
4765 clusterReset(hard);
4766 addReply(c,shared.ok);
4767 } else {
4768 addReplySubcommandSyntaxError(c);
4769 return;
4770 }
4771 }
4772
4773 /* -----------------------------------------------------------------------------
4774 * DUMP, RESTORE and MIGRATE commands
4775 * -------------------------------------------------------------------------- */
4776
4777 /* Generates a DUMP-format representation of the object 'o', adding it to the
4778 * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o,robj * key)4779 void createDumpPayload(rio *payload, robj *o, robj *key) {
4780 unsigned char buf[2];
4781 uint64_t crc;
4782
4783 /* Serialize the object in a RDB-like format. It consist of an object type
4784 * byte followed by the serialized object. This is understood by RESTORE. */
4785 rioInitWithBuffer(payload,sdsempty());
4786 serverAssert(rdbSaveObjectType(payload,o));
4787 serverAssert(rdbSaveObject(payload,o,key));
4788
4789 /* Write the footer, this is how it looks like:
4790 * ----------------+---------------------+---------------+
4791 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4792 * ----------------+---------------------+---------------+
4793 * RDB version and CRC are both in little endian.
4794 */
4795
4796 /* RDB version */
4797 buf[0] = RDB_VERSION & 0xff;
4798 buf[1] = (RDB_VERSION >> 8) & 0xff;
4799 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4800
4801 /* CRC64 */
4802 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4803 sdslen(payload->io.buffer.ptr));
4804 memrev64ifbe(&crc);
4805 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4806 }
4807
4808 /* Verify that the RDB version of the dump payload matches the one of this Redis
4809 * instance and that the checksum is ok.
4810 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4811 * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4812 int verifyDumpPayload(unsigned char *p, size_t len) {
4813 unsigned char *footer;
4814 uint16_t rdbver;
4815 uint64_t crc;
4816
4817 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4818 if (len < 10) return C_ERR;
4819 footer = p+(len-10);
4820
4821 /* Verify RDB version */
4822 rdbver = (footer[1] << 8) | footer[0];
4823 if (rdbver > RDB_VERSION) return C_ERR;
4824
4825 /* Verify CRC64 */
4826 crc = crc64(0,p,len-8);
4827 memrev64ifbe(&crc);
4828 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4829 }
4830
4831 /* DUMP keyname
4832 * DUMP is actually not used by Redis Cluster but it is the obvious
4833 * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4834 void dumpCommand(client *c) {
4835 robj *o, *dumpobj;
4836 rio payload;
4837
4838 /* Check if the key is here. */
4839 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4840 addReply(c,shared.nullbulk);
4841 return;
4842 }
4843
4844 /* Create the DUMP encoded representation. */
4845 createDumpPayload(&payload,o,c->argv[1]);
4846
4847 /* Transfer to the client */
4848 dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4849 addReplyBulk(c,dumpobj);
4850 decrRefCount(dumpobj);
4851 return;
4852 }
4853
4854 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4855 void restoreCommand(client *c) {
4856 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
4857 rio payload;
4858 int j, type, replace = 0, absttl = 0;
4859 robj *obj;
4860
4861 /* Parse additional options */
4862 for (j = 4; j < c->argc; j++) {
4863 int additional = c->argc-j-1;
4864 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4865 replace = 1;
4866 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
4867 absttl = 1;
4868 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
4869 lfu_freq == -1)
4870 {
4871 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
4872 != C_OK) return;
4873 if (lru_idle < 0) {
4874 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
4875 return;
4876 }
4877 lru_clock = LRU_CLOCK();
4878 j++; /* Consume additional arg. */
4879 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
4880 lru_idle == -1)
4881 {
4882 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
4883 != C_OK) return;
4884 if (lfu_freq < 0 || lfu_freq > 255) {
4885 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
4886 return;
4887 }
4888 j++; /* Consume additional arg. */
4889 } else {
4890 addReply(c,shared.syntaxerr);
4891 return;
4892 }
4893 }
4894
4895 /* Make sure this key does not already exist here... */
4896 if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
4897 addReply(c,shared.busykeyerr);
4898 return;
4899 }
4900
4901 /* Check if the TTL value makes sense */
4902 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4903 return;
4904 } else if (ttl < 0) {
4905 addReplyError(c,"Invalid TTL value, must be >= 0");
4906 return;
4907 }
4908
4909 /* Verify RDB version and data checksum. */
4910 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4911 {
4912 addReplyError(c,"DUMP payload version or checksum are wrong");
4913 return;
4914 }
4915
4916 rioInitWithBuffer(&payload,c->argv[3]->ptr);
4917 if (((type = rdbLoadObjectType(&payload)) == -1) ||
4918 ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
4919 {
4920 addReplyError(c,"Bad data format");
4921 return;
4922 }
4923
4924 /* Remove the old key if needed. */
4925 if (replace) dbDelete(c->db,c->argv[1]);
4926
4927 /* Create the key and set the TTL if any */
4928 dbAdd(c->db,c->argv[1],obj);
4929 if (ttl) {
4930 if (!absttl) ttl+=mstime();
4931 setExpire(c,c->db,c->argv[1],ttl);
4932 }
4933 objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
4934 signalModifiedKey(c->db,c->argv[1]);
4935 addReply(c,shared.ok);
4936 server.dirty++;
4937 }
4938
4939 /* MIGRATE socket cache implementation.
4940 *
4941 * We take a map between host:ip and a TCP socket that we used to connect
4942 * to this instance in recent time.
4943 * This sockets are closed when the max number we cache is reached, and also
4944 * in serverCron() when they are around for more than a few seconds. */
4945 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
4946 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
4947
4948 typedef struct migrateCachedSocket {
4949 int fd;
4950 long last_dbid;
4951 time_t last_use_time;
4952 } migrateCachedSocket;
4953
4954 /* Return a migrateCachedSocket containing a TCP socket connected with the
4955 * target instance, possibly returning a cached one.
4956 *
4957 * This function is responsible of sending errors to the client if a
4958 * connection can't be established. In this case -1 is returned.
4959 * Otherwise on success the socket is returned, and the caller should not
4960 * attempt to free it after usage.
4961 *
4962 * If the caller detects an error while using the socket, migrateCloseSocket()
4963 * should be called so that the connection will be created from scratch
4964 * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)4965 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
4966 int fd;
4967 sds name = sdsempty();
4968 migrateCachedSocket *cs;
4969
4970 /* Check if we have an already cached socket for this ip:port pair. */
4971 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4972 name = sdscatlen(name,":",1);
4973 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4974 cs = dictFetchValue(server.migrate_cached_sockets,name);
4975 if (cs) {
4976 sdsfree(name);
4977 cs->last_use_time = server.unixtime;
4978 return cs;
4979 }
4980
4981 /* No cached socket, create one. */
4982 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
4983 /* Too many items, drop one at random. */
4984 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4985 cs = dictGetVal(de);
4986 close(cs->fd);
4987 zfree(cs);
4988 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4989 }
4990
4991 /* Create the socket */
4992 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
4993 atoi(c->argv[2]->ptr));
4994 if (fd == -1) {
4995 sdsfree(name);
4996 addReplyErrorFormat(c,"Can't connect to target node: %s",
4997 server.neterr);
4998 return NULL;
4999 }
5000 anetEnableTcpNoDelay(server.neterr,fd);
5001
5002 /* Check if it connects within the specified timeout. */
5003 if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
5004 sdsfree(name);
5005 addReplySds(c,
5006 sdsnew("-IOERR error or timeout connecting to the client\r\n"));
5007 close(fd);
5008 return NULL;
5009 }
5010
5011 /* Add to the cache and return it to the caller. */
5012 cs = zmalloc(sizeof(*cs));
5013 cs->fd = fd;
5014 cs->last_dbid = -1;
5015 cs->last_use_time = server.unixtime;
5016 dictAdd(server.migrate_cached_sockets,name,cs);
5017 return cs;
5018 }
5019
5020 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)5021 void migrateCloseSocket(robj *host, robj *port) {
5022 sds name = sdsempty();
5023 migrateCachedSocket *cs;
5024
5025 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
5026 name = sdscatlen(name,":",1);
5027 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
5028 cs = dictFetchValue(server.migrate_cached_sockets,name);
5029 if (!cs) {
5030 sdsfree(name);
5031 return;
5032 }
5033
5034 close(cs->fd);
5035 zfree(cs);
5036 dictDelete(server.migrate_cached_sockets,name);
5037 sdsfree(name);
5038 }
5039
migrateCloseTimedoutSockets(void)5040 void migrateCloseTimedoutSockets(void) {
5041 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
5042 dictEntry *de;
5043
5044 while((de = dictNext(di)) != NULL) {
5045 migrateCachedSocket *cs = dictGetVal(de);
5046
5047 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
5048 close(cs->fd);
5049 zfree(cs);
5050 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
5051 }
5052 }
5053 dictReleaseIterator(di);
5054 }
5055
5056 /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
5057 *
5058 * On in the multiple keys form:
5059 *
5060 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1
5061 * key2 ... keyN */
migrateCommand(client * c)5062 void migrateCommand(client *c) {
5063 migrateCachedSocket *cs;
5064 int copy = 0, replace = 0, j;
5065 char *password = NULL;
5066 long timeout;
5067 long dbid;
5068 robj **ov = NULL; /* Objects to migrate. */
5069 robj **kv = NULL; /* Key names. */
5070 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
5071 rio cmd, payload;
5072 int may_retry = 1;
5073 int write_error = 0;
5074 int argv_rewritten = 0;
5075
5076 /* To support the KEYS option we need the following additional state. */
5077 int first_key = 3; /* Argument index of the first key. */
5078 int num_keys = 1; /* By default only migrate the 'key' argument. */
5079
5080 /* Parse additional options */
5081 for (j = 6; j < c->argc; j++) {
5082 int moreargs = j < c->argc-1;
5083 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
5084 copy = 1;
5085 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
5086 replace = 1;
5087 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
5088 if (!moreargs) {
5089 addReply(c,shared.syntaxerr);
5090 return;
5091 }
5092 j++;
5093 password = c->argv[j]->ptr;
5094 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
5095 if (sdslen(c->argv[3]->ptr) != 0) {
5096 addReplyError(c,
5097 "When using MIGRATE KEYS option, the key argument"
5098 " must be set to the empty string");
5099 return;
5100 }
5101 first_key = j+1;
5102 num_keys = c->argc - j - 1;
5103 break; /* All the remaining args are keys. */
5104 } else {
5105 addReply(c,shared.syntaxerr);
5106 return;
5107 }
5108 }
5109
5110 /* Sanity check */
5111 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
5112 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
5113 {
5114 return;
5115 }
5116 if (timeout <= 0) timeout = 1000;
5117
5118 /* Check if the keys are here. If at least one key is to migrate, do it
5119 * otherwise if all the keys are missing reply with "NOKEY" to signal
5120 * the caller there was nothing to migrate. We don't return an error in
5121 * this case, since often this is due to a normal condition like the key
5122 * expiring in the meantime. */
5123 ov = zrealloc(ov,sizeof(robj*)*num_keys);
5124 kv = zrealloc(kv,sizeof(robj*)*num_keys);
5125 int oi = 0;
5126
5127 for (j = 0; j < num_keys; j++) {
5128 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
5129 kv[oi] = c->argv[first_key+j];
5130 oi++;
5131 }
5132 }
5133 num_keys = oi;
5134 if (num_keys == 0) {
5135 zfree(ov); zfree(kv);
5136 addReplySds(c,sdsnew("+NOKEY\r\n"));
5137 return;
5138 }
5139
5140 try_again:
5141 write_error = 0;
5142
5143 /* Connect */
5144 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
5145 if (cs == NULL) {
5146 zfree(ov); zfree(kv);
5147 return; /* error sent to the client by migrateGetSocket() */
5148 }
5149
5150 rioInitWithBuffer(&cmd,sdsempty());
5151
5152 /* Authentication */
5153 if (password) {
5154 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5155 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
5156 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
5157 sdslen(password)));
5158 }
5159
5160 /* Send the SELECT command if the current DB is not already selected. */
5161 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
5162 if (select) {
5163 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
5164 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
5165 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
5166 }
5167
5168 int non_expired = 0; /* Number of keys that we'll find non expired.
5169 Note that serializing large keys may take some time
5170 so certain keys that were found non expired by the
5171 lookupKey() function, may be expired later. */
5172
5173 /* Create RESTORE payload and generate the protocol to call the command. */
5174 for (j = 0; j < num_keys; j++) {
5175 long long ttl = 0;
5176 long long expireat = getExpire(c->db,kv[j]);
5177
5178 if (expireat != -1) {
5179 ttl = expireat-mstime();
5180 if (ttl < 0) {
5181 continue;
5182 }
5183 if (ttl < 1) ttl = 1;
5184 }
5185
5186 /* Relocate valid (non expired) keys into the array in successive
5187 * positions to remove holes created by the keys that were present
5188 * in the first lookup but are now expired after the second lookup. */
5189 kv[non_expired++] = kv[j];
5190
5191 serverAssertWithInfo(c,NULL,
5192 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
5193
5194 if (server.cluster_enabled)
5195 serverAssertWithInfo(c,NULL,
5196 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
5197 else
5198 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
5199 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
5200 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
5201 sdslen(kv[j]->ptr)));
5202 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
5203
5204 /* Emit the payload argument, that is the serialized object using
5205 * the DUMP format. */
5206 createDumpPayload(&payload,ov[j],kv[j]);
5207 serverAssertWithInfo(c,NULL,
5208 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
5209 sdslen(payload.io.buffer.ptr)));
5210 sdsfree(payload.io.buffer.ptr);
5211
5212 /* Add the REPLACE option to the RESTORE command if it was specified
5213 * as a MIGRATE option. */
5214 if (replace)
5215 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
5216 }
5217
5218 /* Fix the actual number of keys we are migrating. */
5219 num_keys = non_expired;
5220
5221 /* Transfer the query to the other node in 64K chunks. */
5222 errno = 0;
5223 {
5224 sds buf = cmd.io.buffer.ptr;
5225 size_t pos = 0, towrite;
5226 int nwritten = 0;
5227
5228 while ((towrite = sdslen(buf)-pos) > 0) {
5229 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
5230 nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
5231 if (nwritten != (signed)towrite) {
5232 write_error = 1;
5233 goto socket_err;
5234 }
5235 pos += nwritten;
5236 }
5237 }
5238
5239 char buf0[1024]; /* Auth reply. */
5240 char buf1[1024]; /* Select reply. */
5241 char buf2[1024]; /* Restore reply. */
5242
5243 /* Read the AUTH reply if needed. */
5244 if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
5245 goto socket_err;
5246
5247 /* Read the SELECT reply if needed. */
5248 if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
5249 goto socket_err;
5250
5251 /* Read the RESTORE replies. */
5252 int error_from_target = 0;
5253 int socket_error = 0;
5254 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
5255
5256 /* Allocate the new argument vector that will replace the current command,
5257 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
5258 * We allocate num_keys+1 because the additional argument is for "DEL"
5259 * command name itself. */
5260 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
5261
5262 for (j = 0; j < num_keys; j++) {
5263 if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
5264 socket_error = 1;
5265 break;
5266 }
5267 if ((password && buf0[0] == '-') ||
5268 (select && buf1[0] == '-') ||
5269 buf2[0] == '-')
5270 {
5271 /* On error assume that last_dbid is no longer valid. */
5272 if (!error_from_target) {
5273 cs->last_dbid = -1;
5274 char *errbuf;
5275 if (password && buf0[0] == '-') errbuf = buf0;
5276 else if (select && buf1[0] == '-') errbuf = buf1;
5277 else errbuf = buf2;
5278
5279 error_from_target = 1;
5280 addReplyErrorFormat(c,"Target instance replied with error: %s",
5281 errbuf+1);
5282 }
5283 } else {
5284 if (!copy) {
5285 /* No COPY option: remove the local key, signal the change. */
5286 dbDelete(c->db,kv[j]);
5287 signalModifiedKey(c->db,kv[j]);
5288 server.dirty++;
5289
5290 /* Populate the argument vector to replace the old one. */
5291 newargv[del_idx++] = kv[j];
5292 incrRefCount(kv[j]);
5293 }
5294 }
5295 }
5296
5297 /* On socket error, if we want to retry, do it now before rewriting the
5298 * command vector. We only retry if we are sure nothing was processed
5299 * and we failed to read the first reply (j == 0 test). */
5300 if (!error_from_target && socket_error && j == 0 && may_retry &&
5301 errno != ETIMEDOUT)
5302 {
5303 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
5304 }
5305
5306 /* On socket errors, close the migration socket now that we still have
5307 * the original host/port in the ARGV. Later the original command may be
5308 * rewritten to DEL and will be too later. */
5309 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
5310
5311 if (!copy) {
5312 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
5313 * this only for the keys for which we received an acknowledgement
5314 * from the receiving Redis server, by using the del_idx index. */
5315 if (del_idx > 1) {
5316 newargv[0] = createStringObject("DEL",3);
5317 /* Note that the following call takes ownership of newargv. */
5318 replaceClientCommandVector(c,del_idx,newargv);
5319 argv_rewritten = 1;
5320 } else {
5321 /* No key transfer acknowledged, no need to rewrite as DEL. */
5322 zfree(newargv);
5323 }
5324 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
5325 }
5326
5327 /* If we are here and a socket error happened, we don't want to retry.
5328 * Just signal the problem to the client, but only do it if we did not
5329 * already queue a different error reported by the destination server. */
5330 if (!error_from_target && socket_error) {
5331 may_retry = 0;
5332 goto socket_err;
5333 }
5334
5335 if (!error_from_target) {
5336 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
5337 * avoid SELECT the next time if the target DB is the same. Reply +OK.
5338 *
5339 * Note: If we reached this point, even if socket_error is true
5340 * still the SELECT command succeeded (otherwise the code jumps to
5341 * socket_err label. */
5342 cs->last_dbid = dbid;
5343 addReply(c,shared.ok);
5344 } else {
5345 /* On error we already sent it in the for loop above, and set
5346 * the currently selected socket to -1 to force SELECT the next time. */
5347 }
5348
5349 sdsfree(cmd.io.buffer.ptr);
5350 zfree(ov); zfree(kv); zfree(newargv);
5351 return;
5352
5353 /* On socket errors we try to close the cached socket and try again.
5354 * It is very common for the cached socket to get closed, if just reopening
5355 * it works it's a shame to notify the error to the caller. */
5356 socket_err:
5357 /* Cleanup we want to perform in both the retry and no retry case.
5358 * Note: Closing the migrate socket will also force SELECT next time. */
5359 sdsfree(cmd.io.buffer.ptr);
5360
5361 /* If the command was rewritten as DEL and there was a socket error,
5362 * we already closed the socket earlier. While migrateCloseSocket()
5363 * is idempotent, the host/port arguments are now gone, so don't do it
5364 * again. */
5365 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
5366 zfree(newargv);
5367 newargv = NULL; /* This will get reallocated on retry. */
5368
5369 /* Retry only if it's not a timeout and we never attempted a retry
5370 * (or the code jumping here did not set may_retry to zero). */
5371 if (errno != ETIMEDOUT && may_retry) {
5372 may_retry = 0;
5373 goto try_again;
5374 }
5375
5376 /* Cleanup we want to do if no retry is attempted. */
5377 zfree(ov); zfree(kv);
5378 addReplySds(c,
5379 sdscatprintf(sdsempty(),
5380 "-IOERR error or timeout %s to target instance\r\n",
5381 write_error ? "writing" : "reading"));
5382 return;
5383 }
5384
5385 /* -----------------------------------------------------------------------------
5386 * Cluster functions related to serving / redirecting clients
5387 * -------------------------------------------------------------------------- */
5388
5389 /* The ASKING command is required after a -ASK redirection.
5390 * The client should issue ASKING before to actually send the command to
5391 * the target instance. See the Redis Cluster specification for more
5392 * information. */
askingCommand(client * c)5393 void askingCommand(client *c) {
5394 if (server.cluster_enabled == 0) {
5395 addReplyError(c,"This instance has cluster support disabled");
5396 return;
5397 }
5398 c->flags |= CLIENT_ASKING;
5399 addReply(c,shared.ok);
5400 }
5401
5402 /* The READONLY command is used by clients to enter the read-only mode.
5403 * In this mode slaves will not redirect clients as long as clients access
5404 * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)5405 void readonlyCommand(client *c) {
5406 if (server.cluster_enabled == 0) {
5407 addReplyError(c,"This instance has cluster support disabled");
5408 return;
5409 }
5410 c->flags |= CLIENT_READONLY;
5411 addReply(c,shared.ok);
5412 }
5413
5414 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)5415 void readwriteCommand(client *c) {
5416 c->flags &= ~CLIENT_READONLY;
5417 addReply(c,shared.ok);
5418 }
5419
5420 /* Return the pointer to the cluster node that is able to serve the command.
5421 * For the function to succeed the command should only target either:
5422 *
5423 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
5424 * 2) Multiple keys in the same hash slot, while the slot is stable (no
5425 * resharding in progress).
5426 *
5427 * On success the function returns the node that is able to serve the request.
5428 * If the node is not 'myself' a redirection must be perfomed. The kind of
5429 * redirection is specified setting the integer passed by reference
5430 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
5431 * CLUSTER_REDIR_MOVED.
5432 *
5433 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
5434 *
5435 * If the command fails NULL is returned, and the reason of the failure is
5436 * provided via 'error_code', which will be set to:
5437 *
5438 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
5439 * don't belong to the same hash slot.
5440 *
5441 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
5442 * belonging to the same slot, but the slot is not stable (in migration or
5443 * importing state, likely because a resharding is in progress).
5444 *
5445 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
5446 * not bound to any node. In this case the cluster global state should be
5447 * already "down" but it is fragile to rely on the update of the global state,
5448 * so we also handle it here.
5449 *
5450 * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
5451 * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)5452 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
5453 clusterNode *n = NULL;
5454 robj *firstkey = NULL;
5455 int multiple_keys = 0;
5456 multiState *ms, _ms;
5457 multiCmd mc;
5458 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
5459
5460 /* Allow any key to be set if a module disabled cluster redirections. */
5461 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
5462 return myself;
5463
5464 /* Set error code optimistically for the base case. */
5465 if (error_code) *error_code = CLUSTER_REDIR_NONE;
5466
5467 /* Modules can turn off Redis Cluster redirection: this is useful
5468 * when writing a module that implements a completely different
5469 * distributed system. */
5470
5471 /* We handle all the cases as if they were EXEC commands, so we have
5472 * a common code path for everything */
5473 if (cmd->proc == execCommand) {
5474 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
5475 * error. */
5476 if (!(c->flags & CLIENT_MULTI)) return myself;
5477 ms = &c->mstate;
5478 } else {
5479 /* In order to have a single codepath create a fake Multi State
5480 * structure if the client is not in MULTI/EXEC state, this way
5481 * we have a single codepath below. */
5482 ms = &_ms;
5483 _ms.commands = &mc;
5484 _ms.count = 1;
5485 mc.argv = argv;
5486 mc.argc = argc;
5487 mc.cmd = cmd;
5488 }
5489
5490 /* Check that all the keys are in the same hash slot, and obtain this
5491 * slot and the node associated. */
5492 for (i = 0; i < ms->count; i++) {
5493 struct redisCommand *mcmd;
5494 robj **margv;
5495 int margc, *keyindex, numkeys, j;
5496
5497 mcmd = ms->commands[i].cmd;
5498 margc = ms->commands[i].argc;
5499 margv = ms->commands[i].argv;
5500
5501 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5502 for (j = 0; j < numkeys; j++) {
5503 robj *thiskey = margv[keyindex[j]];
5504 int thisslot = keyHashSlot((char*)thiskey->ptr,
5505 sdslen(thiskey->ptr));
5506
5507 if (firstkey == NULL) {
5508 /* This is the first key we see. Check what is the slot
5509 * and node. */
5510 firstkey = thiskey;
5511 slot = thisslot;
5512 n = server.cluster->slots[slot];
5513
5514 /* Error: If a slot is not served, we are in "cluster down"
5515 * state. However the state is yet to be updated, so this was
5516 * not trapped earlier in processCommand(). Report the same
5517 * error to the client. */
5518 if (n == NULL) {
5519 getKeysFreeResult(keyindex);
5520 if (error_code)
5521 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5522 return NULL;
5523 }
5524
5525 /* If we are migrating or importing this slot, we need to check
5526 * if we have all the keys in the request (the only way we
5527 * can safely serve the request, otherwise we return a TRYAGAIN
5528 * error). To do so we set the importing/migrating state and
5529 * increment a counter for every missing key. */
5530 if (n == myself &&
5531 server.cluster->migrating_slots_to[slot] != NULL)
5532 {
5533 migrating_slot = 1;
5534 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5535 importing_slot = 1;
5536 }
5537 } else {
5538 /* If it is not the first key, make sure it is exactly
5539 * the same key as the first we saw. */
5540 if (!equalStringObjects(firstkey,thiskey)) {
5541 if (slot != thisslot) {
5542 /* Error: multiple keys from different slots. */
5543 getKeysFreeResult(keyindex);
5544 if (error_code)
5545 *error_code = CLUSTER_REDIR_CROSS_SLOT;
5546 return NULL;
5547 } else {
5548 /* Flag this request as one with multiple different
5549 * keys. */
5550 multiple_keys = 1;
5551 }
5552 }
5553 }
5554
5555 /* Migarting / Improrting slot? Count keys we don't have. */
5556 if ((migrating_slot || importing_slot) &&
5557 lookupKeyRead(&server.db[0],thiskey) == NULL)
5558 {
5559 missing_keys++;
5560 }
5561 }
5562 getKeysFreeResult(keyindex);
5563 }
5564
5565 /* No key at all in command? then we can serve the request
5566 * without redirections or errors in all the cases. */
5567 if (n == NULL) return myself;
5568
5569 /* Cluster is globally down but we got keys? We can't serve the request. */
5570 if (server.cluster->state != CLUSTER_OK) {
5571 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5572 return NULL;
5573 }
5574
5575 /* Return the hashslot by reference. */
5576 if (hashslot) *hashslot = slot;
5577
5578 /* MIGRATE always works in the context of the local node if the slot
5579 * is open (migrating or importing state). We need to be able to freely
5580 * move keys among instances in this case. */
5581 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5582 return myself;
5583
5584 /* If we don't have all the keys and we are migrating the slot, send
5585 * an ASK redirection. */
5586 if (migrating_slot && missing_keys) {
5587 if (error_code) *error_code = CLUSTER_REDIR_ASK;
5588 return server.cluster->migrating_slots_to[slot];
5589 }
5590
5591 /* If we are receiving the slot, and the client correctly flagged the
5592 * request as "ASKING", we can serve the request. However if the request
5593 * involves multiple keys and we don't have them all, the only option is
5594 * to send a TRYAGAIN error. */
5595 if (importing_slot &&
5596 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5597 {
5598 if (multiple_keys && missing_keys) {
5599 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5600 return NULL;
5601 } else {
5602 return myself;
5603 }
5604 }
5605
5606 /* Handle the read-only client case reading from a slave: if this
5607 * node is a slave and the request is about an hash slot our master
5608 * is serving, we can reply without redirection. */
5609 if (c->flags & CLIENT_READONLY &&
5610 (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
5611 cmd->proc == evalShaCommand) &&
5612 nodeIsSlave(myself) &&
5613 myself->slaveof == n)
5614 {
5615 return myself;
5616 }
5617
5618 /* Base case: just return the right node. However if this node is not
5619 * myself, set error_code to MOVED since we need to issue a rediretion. */
5620 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5621 return n;
5622 }
5623
5624 /* Send the client the right redirection code, according to error_code
5625 * that should be set to one of CLUSTER_REDIR_* macros.
5626 *
5627 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5628 * are used, then the node 'n' should not be NULL, but should be the
5629 * node we want to mention in the redirection. Moreover hashslot should
5630 * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5631 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5632 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5633 addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5634 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5635 /* The request spawns multiple keys in the same slot,
5636 * but the slot is not "stable" currently as there is
5637 * a migration or import in progress. */
5638 addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5639 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5640 addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5641 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5642 addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5643 } else if (error_code == CLUSTER_REDIR_MOVED ||
5644 error_code == CLUSTER_REDIR_ASK)
5645 {
5646 addReplySds(c,sdscatprintf(sdsempty(),
5647 "-%s %d %s:%d\r\n",
5648 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5649 hashslot,n->ip,n->port));
5650 } else {
5651 serverPanic("getNodeByQuery() unknown error.");
5652 }
5653 }
5654
5655 /* This function is called by the function processing clients incrementally
5656 * to detect timeouts, in order to handle the following case:
5657 *
5658 * 1) A client blocks with BLPOP or similar blocking operation.
5659 * 2) The master migrates the hash slot elsewhere or turns into a slave.
5660 * 3) The client may remain blocked forever (or up to the max timeout time)
5661 * waiting for a key change that will never happen.
5662 *
5663 * If the client is found to be blocked into an hash slot this node no
5664 * longer handles, the client is sent a redirection error, and the function
5665 * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5666 int clusterRedirectBlockedClientIfNeeded(client *c) {
5667 if (c->flags & CLIENT_BLOCKED &&
5668 (c->btype == BLOCKED_LIST ||
5669 c->btype == BLOCKED_ZSET ||
5670 c->btype == BLOCKED_STREAM))
5671 {
5672 dictEntry *de;
5673 dictIterator *di;
5674
5675 /* If the cluster is down, unblock the client with the right error. */
5676 if (server.cluster->state == CLUSTER_FAIL) {
5677 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5678 return 1;
5679 }
5680
5681 /* All keys must belong to the same slot, so check first key only. */
5682 di = dictGetIterator(c->bpop.keys);
5683 if ((de = dictNext(di)) != NULL) {
5684 robj *key = dictGetKey(de);
5685 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5686 clusterNode *node = server.cluster->slots[slot];
5687
5688 /* We send an error and unblock the client if:
5689 * 1) The slot is unassigned, emitting a cluster down error.
5690 * 2) The slot is not handled by this node, nor being imported. */
5691 if (node != myself &&
5692 server.cluster->importing_slots_from[slot] == NULL)
5693 {
5694 if (node == NULL) {
5695 clusterRedirectClient(c,NULL,0,
5696 CLUSTER_REDIR_DOWN_UNBOUND);
5697 } else {
5698 clusterRedirectClient(c,node,slot,
5699 CLUSTER_REDIR_MOVED);
5700 }
5701 dictReleaseIterator(di);
5702 return 1;
5703 }
5704 }
5705 dictReleaseIterator(di);
5706 }
5707 return 0;
5708 }
5709