1 /* Redis Cluster implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "cluster.h"
33 #include "endianconv.h"
34
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/socket.h>
41 #include <sys/stat.h>
42 #include <sys/file.h>
43 #include <math.h>
44
45 /* A global reference to myself is handy to make code more clear.
46 * Myself always points to server.cluster->myself, that is, the clusterNode
47 * that represents this node. */
48 clusterNode *myself = NULL;
49
50 clusterNode *createClusterNode(char *nodename, int flags);
51 int clusterAddNode(clusterNode *node);
52 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
53 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
54 void clusterSendPing(clusterLink *link, int type);
55 void clusterSendFail(char *nodename);
56 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
57 void clusterUpdateState(void);
58 int clusterNodeGetSlotBit(clusterNode *n, int slot);
59 sds clusterGenNodesDescription(int filter);
60 clusterNode *clusterLookupNode(char *name);
61 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
62 int clusterAddSlot(clusterNode *n, int slot);
63 int clusterDelSlot(int slot);
64 int clusterDelNodeSlots(clusterNode *node);
65 int clusterNodeSetSlotBit(clusterNode *n, int slot);
66 void clusterSetMaster(clusterNode *n);
67 void clusterHandleSlaveFailover(void);
68 void clusterHandleSlaveMigration(int max_slaves);
69 int bitmapTestBit(unsigned char *bitmap, int pos);
70 void clusterDoBeforeSleep(int flags);
71 void clusterSendUpdate(clusterLink *link, clusterNode *node);
72 void resetManualFailover(void);
73 void clusterCloseAllSlots(void);
74 void clusterSetNodeAsMaster(clusterNode *n);
75 void clusterDelNode(clusterNode *delnode);
76 sds representClusterNodeFlags(sds ci, uint16_t flags);
77 uint64_t clusterGetMaxEpoch(void);
78 int clusterBumpConfigEpochWithoutConsensus(void);
79
80 /* -----------------------------------------------------------------------------
81 * Initialization
82 * -------------------------------------------------------------------------- */
83
84 /* Load the cluster config from 'filename'.
85 *
86 * If the file does not exist or is zero-length (this may happen because
87 * when we lock the nodes.conf file, we create a zero-length one for the
88 * sake of locking if it does not already exist), C_ERR is returned.
89 * If the configuration was loaded from the file, C_OK is returned. */
clusterLoadConfig(char * filename)90 int clusterLoadConfig(char *filename) {
91 FILE *fp = fopen(filename,"r");
92 struct stat sb;
93 char *line;
94 int maxline, j;
95
96 if (fp == NULL) {
97 if (errno == ENOENT) {
98 return C_ERR;
99 } else {
100 serverLog(LL_WARNING,
101 "Loading the cluster node config from %s: %s",
102 filename, strerror(errno));
103 exit(1);
104 }
105 }
106
107 /* Check if the file is zero-length: if so return C_ERR to signal
108 * we have to write the config. */
109 if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
110 fclose(fp);
111 return C_ERR;
112 }
113
114 /* Parse the file. Note that single lines of the cluster config file can
115 * be really long as they include all the hash slots of the node.
116 * This means in the worst possible case, half of the Redis slots will be
117 * present in a single line, possibly in importing or migrating state, so
118 * together with the node ID of the sender/receiver.
119 *
120 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
121 maxline = 1024+CLUSTER_SLOTS*128;
122 line = zmalloc(maxline);
123 while(fgets(line,maxline,fp) != NULL) {
124 int argc;
125 sds *argv;
126 clusterNode *n, *master;
127 char *p, *s;
128
129 /* Skip blank lines, they can be created either by users manually
130 * editing nodes.conf or by the config writing process if stopped
131 * before the truncate() call. */
132 if (line[0] == '\n') continue;
133
134 /* Split the line into arguments for processing. */
135 argv = sdssplitargs(line,&argc);
136 if (argv == NULL) goto fmterr;
137
138 /* Handle the special "vars" line. Don't pretend it is the last
139 * line even if it actually is when generated by Redis. */
140 if (strcasecmp(argv[0],"vars") == 0) {
141 for (j = 1; j < argc; j += 2) {
142 if (strcasecmp(argv[j],"currentEpoch") == 0) {
143 server.cluster->currentEpoch =
144 strtoull(argv[j+1],NULL,10);
145 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
146 server.cluster->lastVoteEpoch =
147 strtoull(argv[j+1],NULL,10);
148 } else {
149 serverLog(LL_WARNING,
150 "Skipping unknown cluster config variable '%s'",
151 argv[j]);
152 }
153 }
154 sdsfreesplitres(argv,argc);
155 continue;
156 }
157
158 /* Regular config lines have at least eight fields */
159 if (argc < 8) goto fmterr;
160
161 /* Create this node if it does not exist */
162 n = clusterLookupNode(argv[0]);
163 if (!n) {
164 n = createClusterNode(argv[0],0);
165 clusterAddNode(n);
166 }
167 /* Address and port */
168 if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
169 *p = '\0';
170 memcpy(n->ip,argv[1],strlen(argv[1])+1);
171 n->port = atoi(p+1);
172
173 /* Parse flags */
174 p = s = argv[2];
175 while(p) {
176 p = strchr(s,',');
177 if (p) *p = '\0';
178 if (!strcasecmp(s,"myself")) {
179 serverAssert(server.cluster->myself == NULL);
180 myself = server.cluster->myself = n;
181 n->flags |= CLUSTER_NODE_MYSELF;
182 } else if (!strcasecmp(s,"master")) {
183 n->flags |= CLUSTER_NODE_MASTER;
184 } else if (!strcasecmp(s,"slave")) {
185 n->flags |= CLUSTER_NODE_SLAVE;
186 } else if (!strcasecmp(s,"fail?")) {
187 n->flags |= CLUSTER_NODE_PFAIL;
188 } else if (!strcasecmp(s,"fail")) {
189 n->flags |= CLUSTER_NODE_FAIL;
190 n->fail_time = mstime();
191 } else if (!strcasecmp(s,"handshake")) {
192 n->flags |= CLUSTER_NODE_HANDSHAKE;
193 } else if (!strcasecmp(s,"noaddr")) {
194 n->flags |= CLUSTER_NODE_NOADDR;
195 } else if (!strcasecmp(s,"noflags")) {
196 /* nothing to do */
197 } else {
198 serverPanic("Unknown flag in redis cluster config file");
199 }
200 if (p) s = p+1;
201 }
202
203 /* Get master if any. Set the master and populate master's
204 * slave list. */
205 if (argv[3][0] != '-') {
206 master = clusterLookupNode(argv[3]);
207 if (!master) {
208 master = createClusterNode(argv[3],0);
209 clusterAddNode(master);
210 }
211 n->slaveof = master;
212 clusterNodeAddSlave(master,n);
213 }
214
215 /* Set ping sent / pong received timestamps */
216 if (atoi(argv[4])) n->ping_sent = mstime();
217 if (atoi(argv[5])) n->pong_received = mstime();
218
219 /* Set configEpoch for this node. */
220 n->configEpoch = strtoull(argv[6],NULL,10);
221
222 /* Populate hash slots served by this instance. */
223 for (j = 8; j < argc; j++) {
224 int start, stop;
225
226 if (argv[j][0] == '[') {
227 /* Here we handle migrating / importing slots */
228 int slot;
229 char direction;
230 clusterNode *cn;
231
232 p = strchr(argv[j],'-');
233 serverAssert(p != NULL);
234 *p = '\0';
235 direction = p[1]; /* Either '>' or '<' */
236 slot = atoi(argv[j]+1);
237 p += 3;
238 cn = clusterLookupNode(p);
239 if (!cn) {
240 cn = createClusterNode(p,0);
241 clusterAddNode(cn);
242 }
243 if (direction == '>') {
244 server.cluster->migrating_slots_to[slot] = cn;
245 } else {
246 server.cluster->importing_slots_from[slot] = cn;
247 }
248 continue;
249 } else if ((p = strchr(argv[j],'-')) != NULL) {
250 *p = '\0';
251 start = atoi(argv[j]);
252 stop = atoi(p+1);
253 } else {
254 start = stop = atoi(argv[j]);
255 }
256 while(start <= stop) clusterAddSlot(n, start++);
257 }
258
259 sdsfreesplitres(argv,argc);
260 }
261 /* Config sanity check */
262 if (server.cluster->myself == NULL) goto fmterr;
263
264 zfree(line);
265 fclose(fp);
266
267 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
268
269 /* Something that should never happen: currentEpoch smaller than
270 * the max epoch found in the nodes configuration. However we handle this
271 * as some form of protection against manual editing of critical files. */
272 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
273 server.cluster->currentEpoch = clusterGetMaxEpoch();
274 }
275 return C_OK;
276
277 fmterr:
278 serverLog(LL_WARNING,
279 "Unrecoverable error: corrupted cluster config file.");
280 zfree(line);
281 if (fp) fclose(fp);
282 exit(1);
283 }
284
285 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
286 *
287 * This function writes the node config and returns 0, on error -1
288 * is returned.
289 *
290 * Note: we need to write the file in an atomic way from the point of view
291 * of the POSIX filesystem semantics, so that if the server is stopped
292 * or crashes during the write, we'll end with either the old file or the
293 * new one. Since we have the full payload to write available we can use
294 * a single write to write the whole file. If the pre-existing file was
295 * bigger we pad our payload with newlines that are anyway ignored and truncate
296 * the file afterward. */
clusterSaveConfig(int do_fsync)297 int clusterSaveConfig(int do_fsync) {
298 sds ci;
299 size_t content_size;
300 struct stat sb;
301 int fd;
302
303 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
304
305 /* Get the nodes description and concatenate our "vars" directive to
306 * save currentEpoch and lastVoteEpoch. */
307 ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
308 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
309 (unsigned long long) server.cluster->currentEpoch,
310 (unsigned long long) server.cluster->lastVoteEpoch);
311 content_size = sdslen(ci);
312
313 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
314 == -1) goto err;
315
316 /* Pad the new payload if the existing file length is greater. */
317 if (fstat(fd,&sb) != -1) {
318 if (sb.st_size > (off_t)content_size) {
319 ci = sdsgrowzero(ci,sb.st_size);
320 memset(ci+content_size,'\n',sb.st_size-content_size);
321 }
322 }
323 if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
324 if (do_fsync) {
325 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
326 fsync(fd);
327 }
328
329 /* Truncate the file if needed to remove the final \n padding that
330 * is just garbage. */
331 if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
332 /* ftruncate() failing is not a critical error. */
333 }
334 close(fd);
335 sdsfree(ci);
336 return 0;
337
338 err:
339 if (fd != -1) close(fd);
340 sdsfree(ci);
341 return -1;
342 }
343
clusterSaveConfigOrDie(int do_fsync)344 void clusterSaveConfigOrDie(int do_fsync) {
345 if (clusterSaveConfig(do_fsync) == -1) {
346 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
347 exit(1);
348 }
349 }
350
351 /* Lock the cluster config using flock(), and leaks the file descritor used to
352 * acquire the lock so that the file will be locked forever.
353 *
354 * This works because we always update nodes.conf with a new version
355 * in-place, reopening the file, and writing to it in place (later adjusting
356 * the length with ftruncate()).
357 *
358 * On success C_OK is returned, otherwise an error is logged and
359 * the function returns C_ERR to signal a lock was not acquired. */
clusterLockConfig(char * filename)360 int clusterLockConfig(char *filename) {
361 /* flock() does not exist on Solaris
362 * and a fcntl-based solution won't help, as we constantly re-open that file,
363 * which will release _all_ locks anyway
364 */
365 #if !defined(__sun)
366 /* To lock it, we need to open the file in a way it is created if
367 * it does not exist, otherwise there is a race condition with other
368 * processes. */
369 int fd = open(filename,O_WRONLY|O_CREAT,0644);
370 if (fd == -1) {
371 serverLog(LL_WARNING,
372 "Can't open %s in order to acquire a lock: %s",
373 filename, strerror(errno));
374 return C_ERR;
375 }
376
377 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
378 if (errno == EWOULDBLOCK) {
379 serverLog(LL_WARNING,
380 "Sorry, the cluster configuration file %s is already used "
381 "by a different Redis Cluster node. Please make sure that "
382 "different nodes use different cluster configuration "
383 "files.", filename);
384 } else {
385 serverLog(LL_WARNING,
386 "Impossible to lock %s: %s", filename, strerror(errno));
387 }
388 close(fd);
389 return C_ERR;
390 }
391 /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
392 * lock to the file as long as the process exists. */
393 #endif /* __sun */
394
395 return C_OK;
396 }
397
clusterInit(void)398 void clusterInit(void) {
399 int saveconf = 0;
400
401 server.cluster = zmalloc(sizeof(clusterState));
402 server.cluster->myself = NULL;
403 server.cluster->currentEpoch = 0;
404 server.cluster->state = CLUSTER_FAIL;
405 server.cluster->size = 1;
406 server.cluster->todo_before_sleep = 0;
407 server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
408 server.cluster->nodes_black_list =
409 dictCreate(&clusterNodesBlackListDictType,NULL);
410 server.cluster->failover_auth_time = 0;
411 server.cluster->failover_auth_count = 0;
412 server.cluster->failover_auth_rank = 0;
413 server.cluster->failover_auth_epoch = 0;
414 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
415 server.cluster->lastVoteEpoch = 0;
416 server.cluster->stats_bus_messages_sent = 0;
417 server.cluster->stats_bus_messages_received = 0;
418 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
419 clusterCloseAllSlots();
420
421 /* Lock the cluster config file to make sure every node uses
422 * its own nodes.conf. */
423 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
424 exit(1);
425
426 /* Load or create a new nodes configuration. */
427 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
428 /* No configuration found. We will just use the random name provided
429 * by the createClusterNode() function. */
430 myself = server.cluster->myself =
431 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
432 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
433 myself->name);
434 clusterAddNode(myself);
435 saveconf = 1;
436 }
437 if (saveconf) clusterSaveConfigOrDie(1);
438
439 /* We need a listening TCP port for our cluster messaging needs. */
440 server.cfd_count = 0;
441
442 /* Port sanity check II
443 * The other handshake port check is triggered too late to stop
444 * us from trying to use a too-high cluster port number. */
445 if (server.port > (65535-CLUSTER_PORT_INCR)) {
446 serverLog(LL_WARNING, "Redis port number too high. "
447 "Cluster communication port is 10,000 port "
448 "numbers higher than your Redis port. "
449 "Your Redis port number must be "
450 "lower than 55535.");
451 exit(1);
452 }
453
454 if (listenToPort(server.port+CLUSTER_PORT_INCR,
455 server.cfd,&server.cfd_count) == C_ERR)
456 {
457 exit(1);
458 } else {
459 int j;
460
461 for (j = 0; j < server.cfd_count; j++) {
462 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
463 clusterAcceptHandler, NULL) == AE_ERR)
464 serverPanic("Unrecoverable error creating Redis Cluster "
465 "file event.");
466 }
467 }
468
469 /* The slots -> keys map is a sorted set. Init it. */
470 server.cluster->slots_to_keys = zslCreate();
471
472 /* Set myself->port to my listening port, we'll just need to discover
473 * the IP address via MEET messages. */
474 myself->port = server.port;
475
476 server.cluster->mf_end = 0;
477 resetManualFailover();
478 }
479
480 /* Reset a node performing a soft or hard reset:
481 *
482 * 1) All other nodes are forget.
483 * 2) All the assigned / open slots are released.
484 * 3) If the node is a slave, it turns into a master.
485 * 5) Only for hard reset: a new Node ID is generated.
486 * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
487 * 7) The new configuration is saved and the cluster state updated.
488 * 8) If the node was a slave, the whole data set is flushed away. */
clusterReset(int hard)489 void clusterReset(int hard) {
490 dictIterator *di;
491 dictEntry *de;
492 int j;
493
494 /* Turn into master. */
495 if (nodeIsSlave(myself)) {
496 clusterSetNodeAsMaster(myself);
497 replicationUnsetMaster();
498 emptyDb(NULL);
499 }
500
501 /* Close slots, reset manual failover state. */
502 clusterCloseAllSlots();
503 resetManualFailover();
504
505 /* Unassign all the slots. */
506 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
507
508 /* Forget all the nodes, but myself. */
509 di = dictGetSafeIterator(server.cluster->nodes);
510 while((de = dictNext(di)) != NULL) {
511 clusterNode *node = dictGetVal(de);
512
513 if (node == myself) continue;
514 clusterDelNode(node);
515 }
516 dictReleaseIterator(di);
517
518 /* Hard reset only: set epochs to 0, change node ID. */
519 if (hard) {
520 sds oldname;
521
522 server.cluster->currentEpoch = 0;
523 server.cluster->lastVoteEpoch = 0;
524 myself->configEpoch = 0;
525 serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
526
527 /* To change the Node ID we need to remove the old name from the
528 * nodes table, change the ID, and re-add back with new name. */
529 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
530 dictDelete(server.cluster->nodes,oldname);
531 sdsfree(oldname);
532 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
533 clusterAddNode(myself);
534 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
535 }
536
537 /* Make sure to persist the new config and update the state. */
538 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
539 CLUSTER_TODO_UPDATE_STATE|
540 CLUSTER_TODO_FSYNC_CONFIG);
541 }
542
543 /* -----------------------------------------------------------------------------
544 * CLUSTER communication link
545 * -------------------------------------------------------------------------- */
546
createClusterLink(clusterNode * node)547 clusterLink *createClusterLink(clusterNode *node) {
548 clusterLink *link = zmalloc(sizeof(*link));
549 link->ctime = mstime();
550 link->sndbuf = sdsempty();
551 link->rcvbuf = sdsempty();
552 link->node = node;
553 link->fd = -1;
554 return link;
555 }
556
557 /* Free a cluster link, but does not free the associated node of course.
558 * This function will just make sure that the original node associated
559 * with this link will have the 'link' field set to NULL. */
freeClusterLink(clusterLink * link)560 void freeClusterLink(clusterLink *link) {
561 if (link->fd != -1) {
562 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
563 aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
564 }
565 sdsfree(link->sndbuf);
566 sdsfree(link->rcvbuf);
567 if (link->node)
568 link->node->link = NULL;
569 close(link->fd);
570 zfree(link);
571 }
572
573 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
clusterAcceptHandler(aeEventLoop * el,int fd,void * privdata,int mask)574 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
575 int cport, cfd;
576 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
577 char cip[NET_IP_STR_LEN];
578 clusterLink *link;
579 UNUSED(el);
580 UNUSED(mask);
581 UNUSED(privdata);
582
583 /* If the server is starting up, don't accept cluster connections:
584 * UPDATE messages may interact with the database content. */
585 if (server.masterhost == NULL && server.loading) return;
586
587 while(max--) {
588 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
589 if (cfd == ANET_ERR) {
590 if (errno != EWOULDBLOCK)
591 serverLog(LL_VERBOSE,
592 "Error accepting cluster node: %s", server.neterr);
593 return;
594 }
595 anetNonBlock(NULL,cfd);
596 anetEnableTcpNoDelay(NULL,cfd);
597
598 /* Use non-blocking I/O for cluster messages. */
599 serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
600 /* Create a link object we use to handle the connection.
601 * It gets passed to the readable handler when data is available.
602 * Initiallly the link->node pointer is set to NULL as we don't know
603 * which node is, but the right node is references once we know the
604 * node identity. */
605 link = createClusterLink(NULL);
606 link->fd = cfd;
607 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
608 }
609 }
610
611 /* -----------------------------------------------------------------------------
612 * Key space handling
613 * -------------------------------------------------------------------------- */
614
615 /* We have 16384 hash slots. The hash slot of a given key is obtained
616 * as the least significant 14 bits of the crc16 of the key.
617 *
618 * However if the key contains the {...} pattern, only the part between
619 * { and } is hashed. This may be useful in the future to force certain
620 * keys to be in the same node (assuming no resharding is in progress). */
keyHashSlot(char * key,int keylen)621 unsigned int keyHashSlot(char *key, int keylen) {
622 int s, e; /* start-end indexes of { and } */
623
624 for (s = 0; s < keylen; s++)
625 if (key[s] == '{') break;
626
627 /* No '{' ? Hash the whole key. This is the base case. */
628 if (s == keylen) return crc16(key,keylen) & 0x3FFF;
629
630 /* '{' found? Check if we have the corresponding '}'. */
631 for (e = s+1; e < keylen; e++)
632 if (key[e] == '}') break;
633
634 /* No '}' or nothing betweeen {} ? Hash the whole key. */
635 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
636
637 /* If we are here there is both a { and a } on its right. Hash
638 * what is in the middle between { and }. */
639 return crc16(key+s+1,e-s-1) & 0x3FFF;
640 }
641
642 /* -----------------------------------------------------------------------------
643 * CLUSTER node API
644 * -------------------------------------------------------------------------- */
645
646 /* Create a new cluster node, with the specified flags.
647 * If "nodename" is NULL this is considered a first handshake and a random
648 * node name is assigned to this node (it will be fixed later when we'll
649 * receive the first pong).
650 *
651 * The node is created and returned to the user, but it is not automatically
652 * added to the nodes hash table. */
createClusterNode(char * nodename,int flags)653 clusterNode *createClusterNode(char *nodename, int flags) {
654 clusterNode *node = zmalloc(sizeof(*node));
655
656 if (nodename)
657 memcpy(node->name, nodename, CLUSTER_NAMELEN);
658 else
659 getRandomHexChars(node->name, CLUSTER_NAMELEN);
660 node->ctime = mstime();
661 node->configEpoch = 0;
662 node->flags = flags;
663 memset(node->slots,0,sizeof(node->slots));
664 node->numslots = 0;
665 node->numslaves = 0;
666 node->slaves = NULL;
667 node->slaveof = NULL;
668 node->ping_sent = node->pong_received = 0;
669 node->fail_time = 0;
670 node->link = NULL;
671 memset(node->ip,0,sizeof(node->ip));
672 node->port = 0;
673 node->fail_reports = listCreate();
674 node->voted_time = 0;
675 node->orphaned_time = 0;
676 node->repl_offset_time = 0;
677 node->repl_offset = 0;
678 listSetFreeMethod(node->fail_reports,zfree);
679 return node;
680 }
681
682 /* This function is called every time we get a failure report from a node.
683 * The side effect is to populate the fail_reports list (or to update
684 * the timestamp of an existing report).
685 *
686 * 'failing' is the node that is in failure state according to the
687 * 'sender' node.
688 *
689 * The function returns 0 if it just updates a timestamp of an existing
690 * failure report from the same sender. 1 is returned if a new failure
691 * report is created. */
clusterNodeAddFailureReport(clusterNode * failing,clusterNode * sender)692 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
693 list *l = failing->fail_reports;
694 listNode *ln;
695 listIter li;
696 clusterNodeFailReport *fr;
697
698 /* If a failure report from the same sender already exists, just update
699 * the timestamp. */
700 listRewind(l,&li);
701 while ((ln = listNext(&li)) != NULL) {
702 fr = ln->value;
703 if (fr->node == sender) {
704 fr->time = mstime();
705 return 0;
706 }
707 }
708
709 /* Otherwise create a new report. */
710 fr = zmalloc(sizeof(*fr));
711 fr->node = sender;
712 fr->time = mstime();
713 listAddNodeTail(l,fr);
714 return 1;
715 }
716
717 /* Remove failure reports that are too old, where too old means reasonably
718 * older than the global node timeout. Note that anyway for a node to be
719 * flagged as FAIL we need to have a local PFAIL state that is at least
720 * older than the global node timeout, so we don't just trust the number
721 * of failure reports from other nodes. */
clusterNodeCleanupFailureReports(clusterNode * node)722 void clusterNodeCleanupFailureReports(clusterNode *node) {
723 list *l = node->fail_reports;
724 listNode *ln;
725 listIter li;
726 clusterNodeFailReport *fr;
727 mstime_t maxtime = server.cluster_node_timeout *
728 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
729 mstime_t now = mstime();
730
731 listRewind(l,&li);
732 while ((ln = listNext(&li)) != NULL) {
733 fr = ln->value;
734 if (now - fr->time > maxtime) listDelNode(l,ln);
735 }
736 }
737
738 /* Remove the failing report for 'node' if it was previously considered
739 * failing by 'sender'. This function is called when a node informs us via
740 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
741 *
742 * Note that this function is called relatively often as it gets called even
743 * when there are no nodes failing, and is O(N), however when the cluster is
744 * fine the failure reports list is empty so the function runs in constant
745 * time.
746 *
747 * The function returns 1 if the failure report was found and removed.
748 * Otherwise 0 is returned. */
clusterNodeDelFailureReport(clusterNode * node,clusterNode * sender)749 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
750 list *l = node->fail_reports;
751 listNode *ln;
752 listIter li;
753 clusterNodeFailReport *fr;
754
755 /* Search for a failure report from this sender. */
756 listRewind(l,&li);
757 while ((ln = listNext(&li)) != NULL) {
758 fr = ln->value;
759 if (fr->node == sender) break;
760 }
761 if (!ln) return 0; /* No failure report from this sender. */
762
763 /* Remove the failure report. */
764 listDelNode(l,ln);
765 clusterNodeCleanupFailureReports(node);
766 return 1;
767 }
768
769 /* Return the number of external nodes that believe 'node' is failing,
770 * not including this node, that may have a PFAIL or FAIL state for this
771 * node as well. */
clusterNodeFailureReportsCount(clusterNode * node)772 int clusterNodeFailureReportsCount(clusterNode *node) {
773 clusterNodeCleanupFailureReports(node);
774 return listLength(node->fail_reports);
775 }
776
clusterNodeRemoveSlave(clusterNode * master,clusterNode * slave)777 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
778 int j;
779
780 for (j = 0; j < master->numslaves; j++) {
781 if (master->slaves[j] == slave) {
782 if ((j+1) < master->numslaves) {
783 int remaining_slaves = (master->numslaves - j) - 1;
784 memmove(master->slaves+j,master->slaves+(j+1),
785 (sizeof(*master->slaves) * remaining_slaves));
786 }
787 master->numslaves--;
788 if (master->numslaves == 0)
789 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
790 return C_OK;
791 }
792 }
793 return C_ERR;
794 }
795
clusterNodeAddSlave(clusterNode * master,clusterNode * slave)796 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
797 int j;
798
799 /* If it's already a slave, don't add it again. */
800 for (j = 0; j < master->numslaves; j++)
801 if (master->slaves[j] == slave) return C_ERR;
802 master->slaves = zrealloc(master->slaves,
803 sizeof(clusterNode*)*(master->numslaves+1));
804 master->slaves[master->numslaves] = slave;
805 master->numslaves++;
806 master->flags |= CLUSTER_NODE_MIGRATE_TO;
807 return C_OK;
808 }
809
clusterCountNonFailingSlaves(clusterNode * n)810 int clusterCountNonFailingSlaves(clusterNode *n) {
811 int j, okslaves = 0;
812
813 for (j = 0; j < n->numslaves; j++)
814 if (!nodeFailed(n->slaves[j])) okslaves++;
815 return okslaves;
816 }
817
818 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
freeClusterNode(clusterNode * n)819 void freeClusterNode(clusterNode *n) {
820 sds nodename;
821 int j;
822
823 /* If the node has associated slaves, we have to set
824 * all the slaves->slaveof fields to NULL (unknown). */
825 for (j = 0; j < n->numslaves; j++)
826 n->slaves[j]->slaveof = NULL;
827
828 /* Remove this node from the list of slaves of its master. */
829 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
830
831 /* Unlink from the set of nodes. */
832 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
833 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
834 sdsfree(nodename);
835
836 /* Release link and associated data structures. */
837 if (n->link) freeClusterLink(n->link);
838 listRelease(n->fail_reports);
839 zfree(n->slaves);
840 zfree(n);
841 }
842
843 /* Add a node to the nodes hash table */
clusterAddNode(clusterNode * node)844 int clusterAddNode(clusterNode *node) {
845 int retval;
846
847 retval = dictAdd(server.cluster->nodes,
848 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
849 return (retval == DICT_OK) ? C_OK : C_ERR;
850 }
851
852 /* Remove a node from the cluster. The functio performs the high level
853 * cleanup, calling freeClusterNode() for the low level cleanup.
854 * Here we do the following:
855 *
856 * 1) Mark all the slots handled by it as unassigned.
857 * 2) Remove all the failure reports sent by this node and referenced by
858 * other nodes.
859 * 3) Free the node with freeClusterNode() that will in turn remove it
860 * from the hash table and from the list of slaves of its master, if
861 * it is a slave node.
862 */
clusterDelNode(clusterNode * delnode)863 void clusterDelNode(clusterNode *delnode) {
864 int j;
865 dictIterator *di;
866 dictEntry *de;
867
868 /* 1) Mark slots as unassigned. */
869 for (j = 0; j < CLUSTER_SLOTS; j++) {
870 if (server.cluster->importing_slots_from[j] == delnode)
871 server.cluster->importing_slots_from[j] = NULL;
872 if (server.cluster->migrating_slots_to[j] == delnode)
873 server.cluster->migrating_slots_to[j] = NULL;
874 if (server.cluster->slots[j] == delnode)
875 clusterDelSlot(j);
876 }
877
878 /* 2) Remove failure reports. */
879 di = dictGetSafeIterator(server.cluster->nodes);
880 while((de = dictNext(di)) != NULL) {
881 clusterNode *node = dictGetVal(de);
882
883 if (node == delnode) continue;
884 clusterNodeDelFailureReport(node,delnode);
885 }
886 dictReleaseIterator(di);
887
888 /* 3) Free the node, unlinking it from the cluster. */
889 freeClusterNode(delnode);
890 }
891
892 /* Node lookup by name */
clusterLookupNode(char * name)893 clusterNode *clusterLookupNode(char *name) {
894 sds s = sdsnewlen(name, CLUSTER_NAMELEN);
895 dictEntry *de;
896
897 de = dictFind(server.cluster->nodes,s);
898 sdsfree(s);
899 if (de == NULL) return NULL;
900 return dictGetVal(de);
901 }
902
903 /* This is only used after the handshake. When we connect a given IP/PORT
904 * as a result of CLUSTER MEET we don't have the node name yet, so we
905 * pick a random one, and will fix it when we receive the PONG request using
906 * this function. */
clusterRenameNode(clusterNode * node,char * newname)907 void clusterRenameNode(clusterNode *node, char *newname) {
908 int retval;
909 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
910
911 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
912 node->name, newname);
913 retval = dictDelete(server.cluster->nodes, s);
914 sdsfree(s);
915 serverAssert(retval == DICT_OK);
916 memcpy(node->name, newname, CLUSTER_NAMELEN);
917 clusterAddNode(node);
918 }
919
920 /* -----------------------------------------------------------------------------
921 * CLUSTER config epoch handling
922 * -------------------------------------------------------------------------- */
923
924 /* Return the greatest configEpoch found in the cluster, or the current
925 * epoch if greater than any node configEpoch. */
clusterGetMaxEpoch(void)926 uint64_t clusterGetMaxEpoch(void) {
927 uint64_t max = 0;
928 dictIterator *di;
929 dictEntry *de;
930
931 di = dictGetSafeIterator(server.cluster->nodes);
932 while((de = dictNext(di)) != NULL) {
933 clusterNode *node = dictGetVal(de);
934 if (node->configEpoch > max) max = node->configEpoch;
935 }
936 dictReleaseIterator(di);
937 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
938 return max;
939 }
940
941 /* If this node epoch is zero or is not already the greatest across the
942 * cluster (from the POV of the local configuration), this function will:
943 *
944 * 1) Generate a new config epoch, incrementing the current epoch.
945 * 2) Assign the new epoch to this node, WITHOUT any consensus.
946 * 3) Persist the configuration on disk before sending packets with the
947 * new configuration.
948 *
949 * If the new config epoch is generated and assigend, C_OK is returned,
950 * otherwise C_ERR is returned (since the node has already the greatest
951 * configuration around) and no operation is performed.
952 *
953 * Important note: this function violates the principle that config epochs
954 * should be generated with consensus and should be unique across the cluster.
955 * However Redis Cluster uses this auto-generated new config epochs in two
956 * cases:
957 *
958 * 1) When slots are closed after importing. Otherwise resharding would be
959 * too expensive.
960 * 2) When CLUSTER FAILOVER is called with options that force a slave to
961 * failover its master even if there is not master majority able to
962 * create a new configuration epoch.
963 *
964 * Redis Cluster will not explode using this function, even in the case of
965 * a collision between this node and another node, generating the same
966 * configuration epoch unilaterally, because the config epoch conflict
967 * resolution algorithm will eventually move colliding nodes to different
968 * config epochs. However using this function may violate the "last failover
969 * wins" rule, so should only be used with care. */
clusterBumpConfigEpochWithoutConsensus(void)970 int clusterBumpConfigEpochWithoutConsensus(void) {
971 uint64_t maxEpoch = clusterGetMaxEpoch();
972
973 if (myself->configEpoch == 0 ||
974 myself->configEpoch != maxEpoch)
975 {
976 server.cluster->currentEpoch++;
977 myself->configEpoch = server.cluster->currentEpoch;
978 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
979 CLUSTER_TODO_FSYNC_CONFIG);
980 serverLog(LL_WARNING,
981 "New configEpoch set to %llu",
982 (unsigned long long) myself->configEpoch);
983 return C_OK;
984 } else {
985 return C_ERR;
986 }
987 }
988
989 /* This function is called when this node is a master, and we receive from
990 * another master a configuration epoch that is equal to our configuration
991 * epoch.
992 *
993 * BACKGROUND
994 *
995 * It is not possible that different slaves get the same config
996 * epoch during a failover election, because the slaves need to get voted
997 * by a majority. However when we perform a manual resharding of the cluster
998 * the node will assign a configuration epoch to itself without to ask
999 * for agreement. Usually resharding happens when the cluster is working well
1000 * and is supervised by the sysadmin, however it is possible for a failover
1001 * to happen exactly while the node we are resharding a slot to assigns itself
1002 * a new configuration epoch, but before it is able to propagate it.
1003 *
1004 * So technically it is possible in this condition that two nodes end with
1005 * the same configuration epoch.
1006 *
1007 * Another possibility is that there are bugs in the implementation causing
1008 * this to happen.
1009 *
1010 * Moreover when a new cluster is created, all the nodes start with the same
1011 * configEpoch. This collision resolution code allows nodes to automatically
1012 * end with a different configEpoch at startup automatically.
1013 *
1014 * In all the cases, we want a mechanism that resolves this issue automatically
1015 * as a safeguard. The same configuration epoch for masters serving different
1016 * set of slots is not harmful, but it is if the nodes end serving the same
1017 * slots for some reason (manual errors or software bugs) without a proper
1018 * failover procedure.
1019 *
1020 * In general we want a system that eventually always ends with different
1021 * masters having different configuration epochs whatever happened, since
1022 * nothign is worse than a split-brain condition in a distributed system.
1023 *
1024 * BEHAVIOR
1025 *
1026 * When this function gets called, what happens is that if this node
1027 * has the lexicographically smaller Node ID compared to the other node
1028 * with the conflicting epoch (the 'sender' node), it will assign itself
1029 * the greatest configuration epoch currently detected among nodes plus 1.
1030 *
1031 * This means that even if there are multiple nodes colliding, the node
1032 * with the greatest Node ID never moves forward, so eventually all the nodes
1033 * end with a different configuration epoch.
1034 */
clusterHandleConfigEpochCollision(clusterNode * sender)1035 void clusterHandleConfigEpochCollision(clusterNode *sender) {
1036 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1037 if (sender->configEpoch != myself->configEpoch ||
1038 !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
1039 /* Don't act if the colliding node has a smaller Node ID. */
1040 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1041 /* Get the next ID available at the best of this node knowledge. */
1042 server.cluster->currentEpoch++;
1043 myself->configEpoch = server.cluster->currentEpoch;
1044 clusterSaveConfigOrDie(1);
1045 serverLog(LL_VERBOSE,
1046 "WARNING: configEpoch collision with node %.40s."
1047 " configEpoch set to %llu",
1048 sender->name,
1049 (unsigned long long) myself->configEpoch);
1050 }
1051
1052 /* -----------------------------------------------------------------------------
1053 * CLUSTER nodes blacklist
1054 *
1055 * The nodes blacklist is just a way to ensure that a given node with a given
1056 * Node ID is not readded before some time elapsed (this time is specified
1057 * in seconds in CLUSTER_BLACKLIST_TTL).
1058 *
1059 * This is useful when we want to remove a node from the cluster completely:
1060 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1061 * that even if we receive gossip messages from other nodes that still remember
1062 * about the node we want to remove, we don't re-add it before some time.
1063 *
1064 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1065 * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
1066 * in the cluster without dealing with the problem of other nodes re-adding
1067 * back the node to nodes we already sent the FORGET command to.
1068 *
1069 * The data structure used is a hash table with an sds string representing
1070 * the node ID as key, and the time when it is ok to re-add the node as
1071 * value.
1072 * -------------------------------------------------------------------------- */
1073
1074 #define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1075
1076
1077 /* Before of the addNode() or Exists() operations we always remove expired
1078 * entries from the black list. This is an O(N) operation but it is not a
1079 * problem since add / exists operations are called very infrequently and
1080 * the hash table is supposed to contain very little elements at max.
1081 * However without the cleanup during long uptimes and with some automated
1082 * node add/removal procedures, entries could accumulate. */
clusterBlacklistCleanup(void)1083 void clusterBlacklistCleanup(void) {
1084 dictIterator *di;
1085 dictEntry *de;
1086
1087 di = dictGetSafeIterator(server.cluster->nodes_black_list);
1088 while((de = dictNext(di)) != NULL) {
1089 int64_t expire = dictGetUnsignedIntegerVal(de);
1090
1091 if (expire < server.unixtime)
1092 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1093 }
1094 dictReleaseIterator(di);
1095 }
1096
1097 /* Cleanup the blacklist and add a new node ID to the black list. */
clusterBlacklistAddNode(clusterNode * node)1098 void clusterBlacklistAddNode(clusterNode *node) {
1099 dictEntry *de;
1100 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1101
1102 clusterBlacklistCleanup();
1103 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1104 /* If the key was added, duplicate the sds string representation of
1105 * the key for the next lookup. We'll free it at the end. */
1106 id = sdsdup(id);
1107 }
1108 de = dictFind(server.cluster->nodes_black_list,id);
1109 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1110 sdsfree(id);
1111 }
1112
1113 /* Return non-zero if the specified node ID exists in the blacklist.
1114 * You don't need to pass an sds string here, any pointer to 40 bytes
1115 * will work. */
clusterBlacklistExists(char * nodeid)1116 int clusterBlacklistExists(char *nodeid) {
1117 sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
1118 int retval;
1119
1120 clusterBlacklistCleanup();
1121 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1122 sdsfree(id);
1123 return retval;
1124 }
1125
1126 /* -----------------------------------------------------------------------------
1127 * CLUSTER messages exchange - PING/PONG and gossip
1128 * -------------------------------------------------------------------------- */
1129
1130 /* This function checks if a given node should be marked as FAIL.
1131 * It happens if the following conditions are met:
1132 *
1133 * 1) We received enough failure reports from other master nodes via gossip.
1134 * Enough means that the majority of the masters signaled the node is
1135 * down recently.
1136 * 2) We believe this node is in PFAIL state.
1137 *
1138 * If a failure is detected we also inform the whole cluster about this
1139 * event trying to force every other node to set the FAIL flag for the node.
1140 *
1141 * Note that the form of agreement used here is weak, as we collect the majority
1142 * of masters state during some time, and even if we force agreement by
1143 * propagating the FAIL message, because of partitions we may not reach every
1144 * node. However:
1145 *
1146 * 1) Either we reach the majority and eventually the FAIL state will propagate
1147 * to all the cluster.
1148 * 2) Or there is no majority so no slave promotion will be authorized and the
1149 * FAIL flag will be cleared after some time.
1150 */
markNodeAsFailingIfNeeded(clusterNode * node)1151 void markNodeAsFailingIfNeeded(clusterNode *node) {
1152 int failures;
1153 int needed_quorum = (server.cluster->size / 2) + 1;
1154
1155 if (!nodeTimedOut(node)) return; /* We can reach it. */
1156 if (nodeFailed(node)) return; /* Already FAILing. */
1157
1158 failures = clusterNodeFailureReportsCount(node);
1159 /* Also count myself as a voter if I'm a master. */
1160 if (nodeIsMaster(myself)) failures++;
1161 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1162
1163 serverLog(LL_NOTICE,
1164 "Marking node %.40s as failing (quorum reached).", node->name);
1165
1166 /* Mark the node as failing. */
1167 node->flags &= ~CLUSTER_NODE_PFAIL;
1168 node->flags |= CLUSTER_NODE_FAIL;
1169 node->fail_time = mstime();
1170
1171 /* Broadcast the failing node name to everybody, forcing all the other
1172 * reachable nodes to flag the node as FAIL. */
1173 if (nodeIsMaster(myself)) clusterSendFail(node->name);
1174 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1175 }
1176
1177 /* This function is called only if a node is marked as FAIL, but we are able
1178 * to reach it again. It checks if there are the conditions to undo the FAIL
1179 * state. */
clearNodeFailureIfNeeded(clusterNode * node)1180 void clearNodeFailureIfNeeded(clusterNode *node) {
1181 mstime_t now = mstime();
1182
1183 serverAssert(nodeFailed(node));
1184
1185 /* For slaves we always clear the FAIL flag if we can contact the
1186 * node again. */
1187 if (nodeIsSlave(node) || node->numslots == 0) {
1188 serverLog(LL_NOTICE,
1189 "Clear FAIL state for node %.40s: %s is reachable again.",
1190 node->name,
1191 nodeIsSlave(node) ? "slave" : "master without slots");
1192 node->flags &= ~CLUSTER_NODE_FAIL;
1193 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1194 }
1195
1196 /* If it is a master and...
1197 * 1) The FAIL state is old enough.
1198 * 2) It is yet serving slots from our point of view (not failed over).
1199 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1200 if (nodeIsMaster(node) && node->numslots > 0 &&
1201 (now - node->fail_time) >
1202 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1203 {
1204 serverLog(LL_NOTICE,
1205 "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
1206 node->name);
1207 node->flags &= ~CLUSTER_NODE_FAIL;
1208 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1209 }
1210 }
1211
1212 /* Return true if we already have a node in HANDSHAKE state matching the
1213 * specified ip address and port number. This function is used in order to
1214 * avoid adding a new handshake node for the same address multiple times. */
clusterHandshakeInProgress(char * ip,int port)1215 int clusterHandshakeInProgress(char *ip, int port) {
1216 dictIterator *di;
1217 dictEntry *de;
1218
1219 di = dictGetSafeIterator(server.cluster->nodes);
1220 while((de = dictNext(di)) != NULL) {
1221 clusterNode *node = dictGetVal(de);
1222
1223 if (!nodeInHandshake(node)) continue;
1224 if (!strcasecmp(node->ip,ip) && node->port == port) break;
1225 }
1226 dictReleaseIterator(di);
1227 return de != NULL;
1228 }
1229
1230 /* Start an handshake with the specified address if there is not one
1231 * already in progress. Returns non-zero if the handshake was actually
1232 * started. On error zero is returned and errno is set to one of the
1233 * following values:
1234 *
1235 * EAGAIN - There is already an handshake in progress for this address.
1236 * EINVAL - IP or port are not valid. */
clusterStartHandshake(char * ip,int port)1237 int clusterStartHandshake(char *ip, int port) {
1238 clusterNode *n;
1239 char norm_ip[NET_IP_STR_LEN];
1240 struct sockaddr_storage sa;
1241
1242 /* IP sanity check */
1243 if (inet_pton(AF_INET,ip,
1244 &(((struct sockaddr_in *)&sa)->sin_addr)))
1245 {
1246 sa.ss_family = AF_INET;
1247 } else if (inet_pton(AF_INET6,ip,
1248 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
1249 {
1250 sa.ss_family = AF_INET6;
1251 } else {
1252 errno = EINVAL;
1253 return 0;
1254 }
1255
1256 /* Port sanity check */
1257 if (port <= 0 || port > (65535-CLUSTER_PORT_INCR)) {
1258 errno = EINVAL;
1259 return 0;
1260 }
1261
1262 /* Set norm_ip as the normalized string representation of the node
1263 * IP address. */
1264 memset(norm_ip,0,NET_IP_STR_LEN);
1265 if (sa.ss_family == AF_INET)
1266 inet_ntop(AF_INET,
1267 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
1268 norm_ip,NET_IP_STR_LEN);
1269 else
1270 inet_ntop(AF_INET6,
1271 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
1272 norm_ip,NET_IP_STR_LEN);
1273
1274 if (clusterHandshakeInProgress(norm_ip,port)) {
1275 errno = EAGAIN;
1276 return 0;
1277 }
1278
1279 /* Add the node with a random address (NULL as first argument to
1280 * createClusterNode()). Everything will be fixed during the
1281 * handshake. */
1282 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
1283 memcpy(n->ip,norm_ip,sizeof(n->ip));
1284 n->port = port;
1285 clusterAddNode(n);
1286 return 1;
1287 }
1288
1289 /* Process the gossip section of PING or PONG packets.
1290 * Note that this function assumes that the packet is already sanity-checked
1291 * by the caller, not in the content of the gossip section, but in the
1292 * length. */
clusterProcessGossipSection(clusterMsg * hdr,clusterLink * link)1293 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
1294 uint16_t count = ntohs(hdr->count);
1295 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
1296 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
1297
1298 while(count--) {
1299 uint16_t flags = ntohs(g->flags);
1300 clusterNode *node;
1301 sds ci;
1302
1303 ci = representClusterNodeFlags(sdsempty(), flags);
1304 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d %s",
1305 g->nodename,
1306 g->ip,
1307 ntohs(g->port),
1308 ci);
1309 sdsfree(ci);
1310
1311 /* Update our state accordingly to the gossip sections */
1312 node = clusterLookupNode(g->nodename);
1313 if (node) {
1314 /* We already know this node.
1315 Handle failure reports, only when the sender is a master. */
1316 if (sender && nodeIsMaster(sender) && node != myself) {
1317 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
1318 if (clusterNodeAddFailureReport(node,sender)) {
1319 serverLog(LL_VERBOSE,
1320 "Node %.40s reported node %.40s as not reachable.",
1321 sender->name, node->name);
1322 }
1323 markNodeAsFailingIfNeeded(node);
1324 } else {
1325 if (clusterNodeDelFailureReport(node,sender)) {
1326 serverLog(LL_VERBOSE,
1327 "Node %.40s reported node %.40s is back online.",
1328 sender->name, node->name);
1329 }
1330 }
1331 }
1332
1333 /* If we already know this node, but it is not reachable, and
1334 * we see a different address in the gossip section of a node that
1335 * can talk with this other node, update the address, disconnect
1336 * the old link if any, so that we'll attempt to connect with the
1337 * new address. */
1338 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
1339 !(flags & CLUSTER_NODE_NOADDR) &&
1340 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
1341 (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
1342 {
1343 if (node->link) freeClusterLink(node->link);
1344 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
1345 node->port = ntohs(g->port);
1346 node->flags &= ~CLUSTER_NODE_NOADDR;
1347 }
1348 } else {
1349 /* If it's not in NOADDR state and we don't have it, we
1350 * start a handshake process against this IP/PORT pairs.
1351 *
1352 * Note that we require that the sender of this gossip message
1353 * is a well known node in our cluster, otherwise we risk
1354 * joining another cluster. */
1355 if (sender &&
1356 !(flags & CLUSTER_NODE_NOADDR) &&
1357 !clusterBlacklistExists(g->nodename))
1358 {
1359 clusterStartHandshake(g->ip,ntohs(g->port));
1360 }
1361 }
1362
1363 /* Next node */
1364 g++;
1365 }
1366 }
1367
1368 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes. */
nodeIp2String(char * buf,clusterLink * link)1369 void nodeIp2String(char *buf, clusterLink *link) {
1370 anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
1371 }
1372
1373 /* Update the node address to the IP address that can be extracted
1374 * from link->fd, and at the specified port.
1375 * Also disconnect the node link so that we'll connect again to the new
1376 * address.
1377 *
1378 * If the ip/port pair are already correct no operation is performed at
1379 * all.
1380 *
1381 * The function returns 0 if the node address is still the same,
1382 * otherwise 1 is returned. */
nodeUpdateAddressIfNeeded(clusterNode * node,clusterLink * link,int port)1383 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) {
1384 char ip[NET_IP_STR_LEN] = {0};
1385
1386 /* We don't proceed if the link is the same as the sender link, as this
1387 * function is designed to see if the node link is consistent with the
1388 * symmetric link that is used to receive PINGs from the node.
1389 *
1390 * As a side effect this function never frees the passed 'link', so
1391 * it is safe to call during packet processing. */
1392 if (link == node->link) return 0;
1393
1394 nodeIp2String(ip,link);
1395 if (node->port == port && strcmp(ip,node->ip) == 0) return 0;
1396
1397 /* IP / port is different, update it. */
1398 memcpy(node->ip,ip,sizeof(ip));
1399 node->port = port;
1400 if (node->link) freeClusterLink(node->link);
1401 node->flags &= ~CLUSTER_NODE_NOADDR;
1402 serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
1403 node->name, node->ip, node->port);
1404
1405 /* Check if this is our master and we have to change the
1406 * replication target as well. */
1407 if (nodeIsSlave(myself) && myself->slaveof == node)
1408 replicationSetMaster(node->ip, node->port);
1409 return 1;
1410 }
1411
1412 /* Reconfigure the specified node 'n' as a master. This function is called when
1413 * a node that we believed to be a slave is now acting as master in order to
1414 * update the state of the node. */
clusterSetNodeAsMaster(clusterNode * n)1415 void clusterSetNodeAsMaster(clusterNode *n) {
1416 if (nodeIsMaster(n)) return;
1417
1418 if (n->slaveof) {
1419 clusterNodeRemoveSlave(n->slaveof,n);
1420 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
1421 }
1422 n->flags &= ~CLUSTER_NODE_SLAVE;
1423 n->flags |= CLUSTER_NODE_MASTER;
1424 n->slaveof = NULL;
1425
1426 /* Update config and state. */
1427 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1428 CLUSTER_TODO_UPDATE_STATE);
1429 }
1430
1431 /* This function is called when we receive a master configuration via a
1432 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
1433 * node, and the set of slots claimed under this configEpoch.
1434 *
1435 * What we do is to rebind the slots with newer configuration compared to our
1436 * local configuration, and if needed, we turn ourself into a replica of the
1437 * node (see the function comments for more info).
1438 *
1439 * The 'sender' is the node for which we received a configuration update.
1440 * Sometimes it is not actually the "Sender" of the information, like in the
1441 * case we receive the info via an UPDATE packet. */
clusterUpdateSlotsConfigWith(clusterNode * sender,uint64_t senderConfigEpoch,unsigned char * slots)1442 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
1443 int j;
1444 clusterNode *curmaster, *newmaster = NULL;
1445 /* The dirty slots list is a list of slots for which we lose the ownership
1446 * while having still keys inside. This usually happens after a failover
1447 * or after a manual cluster reconfiguration operated by the admin.
1448 *
1449 * If the update message is not able to demote a master to slave (in this
1450 * case we'll resync with the master updating the whole key space), we
1451 * need to delete all the keys in the slots we lost ownership. */
1452 uint16_t dirty_slots[CLUSTER_SLOTS];
1453 int dirty_slots_count = 0;
1454
1455 /* Here we set curmaster to this node or the node this node
1456 * replicates to if it's a slave. In the for loop we are
1457 * interested to check if slots are taken away from curmaster. */
1458 curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
1459
1460 if (sender == myself) {
1461 serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
1462 return;
1463 }
1464
1465 for (j = 0; j < CLUSTER_SLOTS; j++) {
1466 if (bitmapTestBit(slots,j)) {
1467 /* The slot is already bound to the sender of this message. */
1468 if (server.cluster->slots[j] == sender) continue;
1469
1470 /* The slot is in importing state, it should be modified only
1471 * manually via redis-trib (example: a resharding is in progress
1472 * and the migrating side slot was already closed and is advertising
1473 * a new config. We still want the slot to be closed manually). */
1474 if (server.cluster->importing_slots_from[j]) continue;
1475
1476 /* We rebind the slot to the new node claiming it if:
1477 * 1) The slot was unassigned or the new node claims it with a
1478 * greater configEpoch.
1479 * 2) We are not currently importing the slot. */
1480 if (server.cluster->slots[j] == NULL ||
1481 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
1482 {
1483 /* Was this slot mine, and still contains keys? Mark it as
1484 * a dirty slot. */
1485 if (server.cluster->slots[j] == myself &&
1486 countKeysInSlot(j) &&
1487 sender != myself)
1488 {
1489 dirty_slots[dirty_slots_count] = j;
1490 dirty_slots_count++;
1491 }
1492
1493 if (server.cluster->slots[j] == curmaster)
1494 newmaster = sender;
1495 clusterDelSlot(j);
1496 clusterAddSlot(sender,j);
1497 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1498 CLUSTER_TODO_UPDATE_STATE|
1499 CLUSTER_TODO_FSYNC_CONFIG);
1500 }
1501 }
1502 }
1503
1504 /* If at least one slot was reassigned from a node to another node
1505 * with a greater configEpoch, it is possible that:
1506 * 1) We are a master left without slots. This means that we were
1507 * failed over and we should turn into a replica of the new
1508 * master.
1509 * 2) We are a slave and our master is left without slots. We need
1510 * to replicate to the new slots owner. */
1511 if (newmaster && curmaster->numslots == 0) {
1512 serverLog(LL_WARNING,
1513 "Configuration change detected. Reconfiguring myself "
1514 "as a replica of %.40s", sender->name);
1515 clusterSetMaster(sender);
1516 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1517 CLUSTER_TODO_UPDATE_STATE|
1518 CLUSTER_TODO_FSYNC_CONFIG);
1519 } else if (dirty_slots_count) {
1520 /* If we are here, we received an update message which removed
1521 * ownership for certain slots we still have keys about, but still
1522 * we are serving some slots, so this master node was not demoted to
1523 * a slave.
1524 *
1525 * In order to maintain a consistent state between keys and slots
1526 * we need to remove all the keys from the slots we lost. */
1527 for (j = 0; j < dirty_slots_count; j++)
1528 delKeysInSlot(dirty_slots[j]);
1529 }
1530 }
1531
1532 /* When this function is called, there is a packet to process starting
1533 * at node->rcvbuf. Releasing the buffer is up to the caller, so this
1534 * function should just handle the higher level stuff of processing the
1535 * packet, modifying the cluster state if needed.
1536 *
1537 * The function returns 1 if the link is still valid after the packet
1538 * was processed, otherwise 0 if the link was freed since the packet
1539 * processing lead to some inconsistency error (for instance a PONG
1540 * received from the wrong sender ID). */
clusterProcessPacket(clusterLink * link)1541 int clusterProcessPacket(clusterLink *link) {
1542 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
1543 uint32_t totlen = ntohl(hdr->totlen);
1544 uint16_t type = ntohs(hdr->type);
1545
1546 server.cluster->stats_bus_messages_received++;
1547 serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
1548 type, (unsigned long) totlen);
1549
1550 /* Perform sanity checks */
1551 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
1552 if (totlen > sdslen(link->rcvbuf)) return 1;
1553
1554 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
1555 /* Can't handle messages of different versions. */
1556 return 1;
1557 }
1558
1559 uint16_t flags = ntohs(hdr->flags);
1560 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
1561 clusterNode *sender;
1562
1563 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1564 type == CLUSTERMSG_TYPE_MEET)
1565 {
1566 uint16_t count = ntohs(hdr->count);
1567 uint32_t explen; /* expected length of this packet */
1568
1569 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1570 explen += (sizeof(clusterMsgDataGossip)*count);
1571 if (totlen != explen) return 1;
1572 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1573 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1574
1575 explen += sizeof(clusterMsgDataFail);
1576 if (totlen != explen) return 1;
1577 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1578 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1579
1580 explen += sizeof(clusterMsgDataPublish) -
1581 8 +
1582 ntohl(hdr->data.publish.msg.channel_len) +
1583 ntohl(hdr->data.publish.msg.message_len);
1584 if (totlen != explen) return 1;
1585 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
1586 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
1587 type == CLUSTERMSG_TYPE_MFSTART)
1588 {
1589 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1590
1591 if (totlen != explen) return 1;
1592 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1593 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
1594
1595 explen += sizeof(clusterMsgDataUpdate);
1596 if (totlen != explen) return 1;
1597 }
1598
1599 /* Check if the sender is a known node. */
1600 sender = clusterLookupNode(hdr->sender);
1601 if (sender && !nodeInHandshake(sender)) {
1602 /* Update our curretEpoch if we see a newer epoch in the cluster. */
1603 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
1604 senderConfigEpoch = ntohu64(hdr->configEpoch);
1605 if (senderCurrentEpoch > server.cluster->currentEpoch)
1606 server.cluster->currentEpoch = senderCurrentEpoch;
1607 /* Update the sender configEpoch if it is publishing a newer one. */
1608 if (senderConfigEpoch > sender->configEpoch) {
1609 sender->configEpoch = senderConfigEpoch;
1610 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1611 CLUSTER_TODO_FSYNC_CONFIG);
1612 }
1613 /* Update the replication offset info for this node. */
1614 sender->repl_offset = ntohu64(hdr->offset);
1615 sender->repl_offset_time = mstime();
1616 /* If we are a slave performing a manual failover and our master
1617 * sent its offset while already paused, populate the MF state. */
1618 if (server.cluster->mf_end &&
1619 nodeIsSlave(myself) &&
1620 myself->slaveof == sender &&
1621 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
1622 server.cluster->mf_master_offset == 0)
1623 {
1624 server.cluster->mf_master_offset = sender->repl_offset;
1625 serverLog(LL_WARNING,
1626 "Received replication offset for paused "
1627 "master manual failover: %lld",
1628 server.cluster->mf_master_offset);
1629 }
1630 }
1631
1632 /* Initial processing of PING and MEET requests replying with a PONG. */
1633 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
1634 serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
1635
1636 /* We use incoming MEET messages in order to set the address
1637 * for 'myself', since only other cluster nodes will send us
1638 * MEET messagses on handshakes, when the cluster joins, or
1639 * later if we changed address, and those nodes will use our
1640 * official address to connect to us. So by obtaining this address
1641 * from the socket is a simple way to discover / update our own
1642 * address in the cluster without it being hardcoded in the config.
1643 *
1644 * However if we don't have an address at all, we update the address
1645 * even with a normal PING packet. If it's wrong it will be fixed
1646 * by MEET later. */
1647 if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') {
1648 char ip[NET_IP_STR_LEN];
1649
1650 if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
1651 strcmp(ip,myself->ip))
1652 {
1653 memcpy(myself->ip,ip,NET_IP_STR_LEN);
1654 serverLog(LL_WARNING,"IP address for this node updated to %s",
1655 myself->ip);
1656 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1657 }
1658 }
1659
1660 /* Add this node if it is new for us and the msg type is MEET.
1661 * In this stage we don't try to add the node with the right
1662 * flags, slaveof pointer, and so forth, as this details will be
1663 * resolved when we'll receive PONGs from the node. */
1664 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
1665 clusterNode *node;
1666
1667 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
1668 nodeIp2String(node->ip,link);
1669 node->port = ntohs(hdr->port);
1670 clusterAddNode(node);
1671 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1672 }
1673
1674 /* If this is a MEET packet from an unknown node, we still process
1675 * the gossip section here since we have to trust the sender because
1676 * of the message type. */
1677 if (!sender && type == CLUSTERMSG_TYPE_MEET)
1678 clusterProcessGossipSection(hdr,link);
1679
1680 /* Anyway reply with a PONG */
1681 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
1682 }
1683
1684 /* PING, PONG, MEET: process config information. */
1685 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
1686 type == CLUSTERMSG_TYPE_MEET)
1687 {
1688 serverLog(LL_DEBUG,"%s packet received: %p",
1689 type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
1690 (void*)link->node);
1691 if (link->node) {
1692 if (nodeInHandshake(link->node)) {
1693 /* If we already have this node, try to change the
1694 * IP/port of the node with the new one. */
1695 if (sender) {
1696 serverLog(LL_VERBOSE,
1697 "Handshake: we already know node %.40s, "
1698 "updating the address if needed.", sender->name);
1699 if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
1700 {
1701 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1702 CLUSTER_TODO_UPDATE_STATE);
1703 }
1704 /* Free this node as we already have it. This will
1705 * cause the link to be freed as well. */
1706 clusterDelNode(link->node);
1707 return 0;
1708 }
1709
1710 /* First thing to do is replacing the random name with the
1711 * right node name if this was a handshake stage. */
1712 clusterRenameNode(link->node, hdr->sender);
1713 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
1714 link->node->name);
1715 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
1716 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
1717 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1718 } else if (memcmp(link->node->name,hdr->sender,
1719 CLUSTER_NAMELEN) != 0)
1720 {
1721 /* If the reply has a non matching node ID we
1722 * disconnect this node and set it as not having an associated
1723 * address. */
1724 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
1725 link->node->name,
1726 (int)(mstime()-(link->node->ctime)),
1727 link->node->flags);
1728 link->node->flags |= CLUSTER_NODE_NOADDR;
1729 link->node->ip[0] = '\0';
1730 link->node->port = 0;
1731 freeClusterLink(link);
1732 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1733 return 0;
1734 }
1735 }
1736
1737 /* Update the node address if it changed. */
1738 if (sender && type == CLUSTERMSG_TYPE_PING &&
1739 !nodeInHandshake(sender) &&
1740 nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
1741 {
1742 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1743 CLUSTER_TODO_UPDATE_STATE);
1744 }
1745
1746 /* Update our info about the node */
1747 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
1748 link->node->pong_received = mstime();
1749 link->node->ping_sent = 0;
1750
1751 /* The PFAIL condition can be reversed without external
1752 * help if it is momentary (that is, if it does not
1753 * turn into a FAIL state).
1754 *
1755 * The FAIL condition is also reversible under specific
1756 * conditions detected by clearNodeFailureIfNeeded(). */
1757 if (nodeTimedOut(link->node)) {
1758 link->node->flags &= ~CLUSTER_NODE_PFAIL;
1759 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1760 CLUSTER_TODO_UPDATE_STATE);
1761 } else if (nodeFailed(link->node)) {
1762 clearNodeFailureIfNeeded(link->node);
1763 }
1764 }
1765
1766 /* Check for role switch: slave -> master or master -> slave. */
1767 if (sender) {
1768 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
1769 sizeof(hdr->slaveof)))
1770 {
1771 /* Node is a master. */
1772 clusterSetNodeAsMaster(sender);
1773 } else {
1774 /* Node is a slave. */
1775 clusterNode *master = clusterLookupNode(hdr->slaveof);
1776
1777 if (nodeIsMaster(sender)) {
1778 /* Master turned into a slave! Reconfigure the node. */
1779 clusterDelNodeSlots(sender);
1780 sender->flags &= ~(CLUSTER_NODE_MASTER|
1781 CLUSTER_NODE_MIGRATE_TO);
1782 sender->flags |= CLUSTER_NODE_SLAVE;
1783
1784 /* Update config and state. */
1785 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1786 CLUSTER_TODO_UPDATE_STATE);
1787 }
1788
1789 /* Master node changed for this slave? */
1790 if (master && sender->slaveof != master) {
1791 if (sender->slaveof)
1792 clusterNodeRemoveSlave(sender->slaveof,sender);
1793 clusterNodeAddSlave(master,sender);
1794 sender->slaveof = master;
1795
1796 /* Update config. */
1797 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
1798 }
1799 }
1800 }
1801
1802 /* Update our info about served slots.
1803 *
1804 * Note: this MUST happen after we update the master/slave state
1805 * so that CLUSTER_NODE_MASTER flag will be set. */
1806
1807 /* Many checks are only needed if the set of served slots this
1808 * instance claims is different compared to the set of slots we have
1809 * for it. Check this ASAP to avoid other computational expansive
1810 * checks later. */
1811 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
1812 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
1813
1814 if (sender) {
1815 sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
1816 if (sender_master) {
1817 dirty_slots = memcmp(sender_master->slots,
1818 hdr->myslots,sizeof(hdr->myslots)) != 0;
1819 }
1820 }
1821
1822 /* 1) If the sender of the message is a master, and we detected that
1823 * the set of slots it claims changed, scan the slots to see if we
1824 * need to update our configuration. */
1825 if (sender && nodeIsMaster(sender) && dirty_slots)
1826 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
1827
1828 /* 2) We also check for the reverse condition, that is, the sender
1829 * claims to serve slots we know are served by a master with a
1830 * greater configEpoch. If this happens we inform the sender.
1831 *
1832 * This is useful because sometimes after a partition heals, a
1833 * reappearing master may be the last one to claim a given set of
1834 * hash slots, but with a configuration that other instances know to
1835 * be deprecated. Example:
1836 *
1837 * A and B are master and slave for slots 1,2,3.
1838 * A is partitioned away, B gets promoted.
1839 * B is partitioned away, and A returns available.
1840 *
1841 * Usually B would PING A publishing its set of served slots and its
1842 * configEpoch, but because of the partition B can't inform A of the
1843 * new configuration, so other nodes that have an updated table must
1844 * do it. In this way A will stop to act as a master (or can try to
1845 * failover if there are the conditions to win the election). */
1846 if (sender && dirty_slots) {
1847 int j;
1848
1849 for (j = 0; j < CLUSTER_SLOTS; j++) {
1850 if (bitmapTestBit(hdr->myslots,j)) {
1851 if (server.cluster->slots[j] == sender ||
1852 server.cluster->slots[j] == NULL) continue;
1853 if (server.cluster->slots[j]->configEpoch >
1854 senderConfigEpoch)
1855 {
1856 serverLog(LL_VERBOSE,
1857 "Node %.40s has old slots configuration, sending "
1858 "an UPDATE message about %.40s",
1859 sender->name, server.cluster->slots[j]->name);
1860 clusterSendUpdate(sender->link,
1861 server.cluster->slots[j]);
1862
1863 /* TODO: instead of exiting the loop send every other
1864 * UPDATE packet for other nodes that are the new owner
1865 * of sender's slots. */
1866 break;
1867 }
1868 }
1869 }
1870 }
1871
1872 /* If our config epoch collides with the sender's try to fix
1873 * the problem. */
1874 if (sender &&
1875 nodeIsMaster(myself) && nodeIsMaster(sender) &&
1876 senderConfigEpoch == myself->configEpoch)
1877 {
1878 clusterHandleConfigEpochCollision(sender);
1879 }
1880
1881 /* Get info from the gossip section */
1882 if (sender) clusterProcessGossipSection(hdr,link);
1883 } else if (type == CLUSTERMSG_TYPE_FAIL) {
1884 clusterNode *failing;
1885
1886 if (sender) {
1887 failing = clusterLookupNode(hdr->data.fail.about.nodename);
1888 if (failing &&
1889 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
1890 {
1891 serverLog(LL_NOTICE,
1892 "FAIL message received from %.40s about %.40s",
1893 hdr->sender, hdr->data.fail.about.nodename);
1894 failing->flags |= CLUSTER_NODE_FAIL;
1895 failing->fail_time = mstime();
1896 failing->flags &= ~CLUSTER_NODE_PFAIL;
1897 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1898 CLUSTER_TODO_UPDATE_STATE);
1899 }
1900 } else {
1901 serverLog(LL_NOTICE,
1902 "Ignoring FAIL message from unknown node %.40s about %.40s",
1903 hdr->sender, hdr->data.fail.about.nodename);
1904 }
1905 } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
1906 robj *channel, *message;
1907 uint32_t channel_len, message_len;
1908
1909 /* Don't bother creating useless objects if there are no
1910 * Pub/Sub subscribers. */
1911 if (dictSize(server.pubsub_channels) ||
1912 listLength(server.pubsub_patterns))
1913 {
1914 channel_len = ntohl(hdr->data.publish.msg.channel_len);
1915 message_len = ntohl(hdr->data.publish.msg.message_len);
1916 channel = createStringObject(
1917 (char*)hdr->data.publish.msg.bulk_data,channel_len);
1918 message = createStringObject(
1919 (char*)hdr->data.publish.msg.bulk_data+channel_len,
1920 message_len);
1921 pubsubPublishMessage(channel,message);
1922 decrRefCount(channel);
1923 decrRefCount(message);
1924 }
1925 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
1926 if (!sender) return 1; /* We don't know that node. */
1927 clusterSendFailoverAuthIfNeeded(sender,hdr);
1928 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
1929 if (!sender) return 1; /* We don't know that node. */
1930 /* We consider this vote only if the sender is a master serving
1931 * a non zero number of slots, and its currentEpoch is greater or
1932 * equal to epoch where this node started the election. */
1933 if (nodeIsMaster(sender) && sender->numslots > 0 &&
1934 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
1935 {
1936 server.cluster->failover_auth_count++;
1937 /* Maybe we reached a quorum here, set a flag to make sure
1938 * we check ASAP. */
1939 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
1940 }
1941 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
1942 /* This message is acceptable only if I'm a master and the sender
1943 * is one of my slaves. */
1944 if (!sender || sender->slaveof != myself) return 1;
1945 /* Manual failover requested from slaves. Initialize the state
1946 * accordingly. */
1947 resetManualFailover();
1948 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
1949 server.cluster->mf_slave = sender;
1950 pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
1951 serverLog(LL_WARNING,"Manual failover requested by slave %.40s.",
1952 sender->name);
1953 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
1954 clusterNode *n; /* The node the update is about. */
1955 uint64_t reportedConfigEpoch =
1956 ntohu64(hdr->data.update.nodecfg.configEpoch);
1957
1958 if (!sender) return 1; /* We don't know the sender. */
1959 n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
1960 if (!n) return 1; /* We don't know the reported node. */
1961 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
1962
1963 /* If in our current config the node is a slave, set it as a master. */
1964 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
1965
1966 /* Update the node's configEpoch. */
1967 n->configEpoch = reportedConfigEpoch;
1968 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1969 CLUSTER_TODO_FSYNC_CONFIG);
1970
1971 /* Check the bitmap of served slots and update our
1972 * config accordingly. */
1973 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
1974 hdr->data.update.nodecfg.slots);
1975 } else {
1976 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
1977 }
1978 return 1;
1979 }
1980
1981 /* This function is called when we detect the link with this node is lost.
1982 We set the node as no longer connected. The Cluster Cron will detect
1983 this connection and will try to get it connected again.
1984
1985 Instead if the node is a temporary node used to accept a query, we
1986 completely free the node on error. */
handleLinkIOError(clusterLink * link)1987 void handleLinkIOError(clusterLink *link) {
1988 freeClusterLink(link);
1989 }
1990
1991 /* Send data. This is handled using a trivial send buffer that gets
1992 * consumed by write(). We don't try to optimize this for speed too much
1993 * as this is a very low traffic channel. */
clusterWriteHandler(aeEventLoop * el,int fd,void * privdata,int mask)1994 void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1995 clusterLink *link = (clusterLink*) privdata;
1996 ssize_t nwritten;
1997 UNUSED(el);
1998 UNUSED(mask);
1999
2000 nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
2001 if (nwritten <= 0) {
2002 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
2003 strerror(errno));
2004 handleLinkIOError(link);
2005 return;
2006 }
2007 sdsrange(link->sndbuf,nwritten,-1);
2008 if (sdslen(link->sndbuf) == 0)
2009 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
2010 }
2011
2012 /* Read data. Try to read the first field of the header first to check the
2013 * full length of the packet. When a whole packet is in memory this function
2014 * will call the function to process the packet. And so forth. */
clusterReadHandler(aeEventLoop * el,int fd,void * privdata,int mask)2015 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2016 char buf[sizeof(clusterMsg)];
2017 ssize_t nread;
2018 clusterMsg *hdr;
2019 clusterLink *link = (clusterLink*) privdata;
2020 unsigned int readlen, rcvbuflen;
2021 UNUSED(el);
2022 UNUSED(mask);
2023
2024 while(1) { /* Read as long as there is data to read. */
2025 rcvbuflen = sdslen(link->rcvbuf);
2026 if (rcvbuflen < 8) {
2027 /* First, obtain the first 8 bytes to get the full message
2028 * length. */
2029 readlen = 8 - rcvbuflen;
2030 } else {
2031 /* Finally read the full message. */
2032 hdr = (clusterMsg*) link->rcvbuf;
2033 if (rcvbuflen == 8) {
2034 /* Perform some sanity check on the message signature
2035 * and length. */
2036 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
2037 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
2038 {
2039 serverLog(LL_WARNING,
2040 "Bad message length or signature received "
2041 "from Cluster bus.");
2042 handleLinkIOError(link);
2043 return;
2044 }
2045 }
2046 readlen = ntohl(hdr->totlen) - rcvbuflen;
2047 if (readlen > sizeof(buf)) readlen = sizeof(buf);
2048 }
2049
2050 nread = read(fd,buf,readlen);
2051 if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
2052
2053 if (nread <= 0) {
2054 /* I/O error... */
2055 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
2056 (nread == 0) ? "connection closed" : strerror(errno));
2057 handleLinkIOError(link);
2058 return;
2059 } else {
2060 /* Read data and recast the pointer to the new buffer. */
2061 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
2062 hdr = (clusterMsg*) link->rcvbuf;
2063 rcvbuflen += nread;
2064 }
2065
2066 /* Total length obtained? Process this packet. */
2067 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
2068 if (clusterProcessPacket(link)) {
2069 sdsfree(link->rcvbuf);
2070 link->rcvbuf = sdsempty();
2071 } else {
2072 return; /* Link no longer valid. */
2073 }
2074 }
2075 }
2076 }
2077
2078 /* Put stuff into the send buffer.
2079 *
2080 * It is guaranteed that this function will never have as a side effect
2081 * the link to be invalidated, so it is safe to call this function
2082 * from event handlers that will do stuff with the same link later. */
clusterSendMessage(clusterLink * link,unsigned char * msg,size_t msglen)2083 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
2084 if (sdslen(link->sndbuf) == 0 && msglen != 0)
2085 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
2086 clusterWriteHandler,link);
2087
2088 link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
2089 server.cluster->stats_bus_messages_sent++;
2090 }
2091
2092 /* Send a message to all the nodes that are part of the cluster having
2093 * a connected link.
2094 *
2095 * It is guaranteed that this function will never have as a side effect
2096 * some node->link to be invalidated, so it is safe to call this function
2097 * from event handlers that will do stuff with node links later. */
clusterBroadcastMessage(void * buf,size_t len)2098 void clusterBroadcastMessage(void *buf, size_t len) {
2099 dictIterator *di;
2100 dictEntry *de;
2101
2102 di = dictGetSafeIterator(server.cluster->nodes);
2103 while((de = dictNext(di)) != NULL) {
2104 clusterNode *node = dictGetVal(de);
2105
2106 if (!node->link) continue;
2107 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
2108 continue;
2109 clusterSendMessage(node->link,buf,len);
2110 }
2111 dictReleaseIterator(di);
2112 }
2113
2114 /* Build the message header. hdr must point to a buffer at least
2115 * sizeof(clusterMsg) in bytes. */
clusterBuildMessageHdr(clusterMsg * hdr,int type)2116 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
2117 int totlen = 0;
2118 uint64_t offset;
2119 clusterNode *master;
2120
2121 /* If this node is a master, we send its slots bitmap and configEpoch.
2122 * If this node is a slave we send the master's information instead (the
2123 * node is flagged as slave so the receiver knows that it is NOT really
2124 * in charge for this slots. */
2125 master = (nodeIsSlave(myself) && myself->slaveof) ?
2126 myself->slaveof : myself;
2127
2128 memset(hdr,0,sizeof(*hdr));
2129 hdr->ver = htons(CLUSTER_PROTO_VER);
2130 hdr->sig[0] = 'R';
2131 hdr->sig[1] = 'C';
2132 hdr->sig[2] = 'm';
2133 hdr->sig[3] = 'b';
2134 hdr->type = htons(type);
2135 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
2136
2137 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
2138 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
2139 if (myself->slaveof != NULL)
2140 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
2141 hdr->port = htons(server.port);
2142 hdr->flags = htons(myself->flags);
2143 hdr->state = server.cluster->state;
2144
2145 /* Set the currentEpoch and configEpochs. */
2146 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
2147 hdr->configEpoch = htonu64(master->configEpoch);
2148
2149 /* Set the replication offset. */
2150 if (nodeIsSlave(myself))
2151 offset = replicationGetSlaveOffset();
2152 else
2153 offset = server.master_repl_offset;
2154 hdr->offset = htonu64(offset);
2155
2156 /* Set the message flags. */
2157 if (nodeIsMaster(myself) && server.cluster->mf_end)
2158 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
2159
2160 /* Compute the message length for certain messages. For other messages
2161 * this is up to the caller. */
2162 if (type == CLUSTERMSG_TYPE_FAIL) {
2163 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2164 totlen += sizeof(clusterMsgDataFail);
2165 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2166 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2167 totlen += sizeof(clusterMsgDataUpdate);
2168 }
2169 hdr->totlen = htonl(totlen);
2170 /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
2171 }
2172
2173 /* Send a PING or PONG packet to the specified node, making sure to add enough
2174 * gossip informations. */
clusterSendPing(clusterLink * link,int type)2175 void clusterSendPing(clusterLink *link, int type) {
2176 unsigned char *buf;
2177 clusterMsg *hdr;
2178 int gossipcount = 0; /* Number of gossip sections added so far. */
2179 int wanted; /* Number of gossip sections we want to append if possible. */
2180 int totlen; /* Total packet length. */
2181 /* freshnodes is the max number of nodes we can hope to append at all:
2182 * nodes available minus two (ourself and the node we are sending the
2183 * message to). However practically there may be less valid nodes since
2184 * nodes in handshake state, disconnected, are not considered. */
2185 int freshnodes = dictSize(server.cluster->nodes)-2;
2186
2187 /* How many gossip sections we want to add? 1/10 of the number of nodes
2188 * and anyway at least 3. Why 1/10?
2189 *
2190 * If we have N masters, with N/10 entries, and we consider that in
2191 * node_timeout we exchange with each other node at least 4 packets
2192 * (we ping in the worst case in node_timeout/2 time, and we also
2193 * receive two pings from the host), we have a total of 8 packets
2194 * in the node_timeout*2 falure reports validity time. So we have
2195 * that, for a single PFAIL node, we can expect to receive the following
2196 * number of failure reports (in the specified window of time):
2197 *
2198 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
2199 *
2200 * PROB = probability of being featured in a single gossip entry,
2201 * which is 1 / NUM_OF_NODES.
2202 * ENTRIES = 10.
2203 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
2204 *
2205 * If we assume we have just masters (so num of nodes and num of masters
2206 * is the same), with 1/10 we always get over the majority, and specifically
2207 * 80% of the number of nodes, to account for many masters failing at the
2208 * same time.
2209 *
2210 * Since we have non-voting slaves that lower the probability of an entry
2211 * to feature our node, we set the number of entires per packet as
2212 * 10% of the total nodes we have. */
2213 wanted = floor(dictSize(server.cluster->nodes)/10);
2214 if (wanted < 3) wanted = 3;
2215 if (wanted > freshnodes) wanted = freshnodes;
2216
2217 /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
2218 * later according to the number of gossip sections we really were able
2219 * to put inside the packet. */
2220 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2221 totlen += (sizeof(clusterMsgDataGossip)*wanted);
2222 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
2223 * sizeof(clusterMsg) or more. */
2224 if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
2225 buf = zcalloc(totlen);
2226 hdr = (clusterMsg*) buf;
2227
2228 /* Populate the header. */
2229 if (link->node && type == CLUSTERMSG_TYPE_PING)
2230 link->node->ping_sent = mstime();
2231 clusterBuildMessageHdr(hdr,type);
2232
2233 /* Populate the gossip fields */
2234 int maxiterations = wanted*3;
2235 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
2236 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
2237 clusterNode *this = dictGetVal(de);
2238 clusterMsgDataGossip *gossip;
2239 int j;
2240
2241 /* Don't include this node: the whole packet header is about us
2242 * already, so we just gossip about other nodes. */
2243 if (this == myself) continue;
2244
2245 /* Give a bias to FAIL/PFAIL nodes. */
2246 if (maxiterations > wanted*2 &&
2247 !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
2248 continue;
2249
2250 /* In the gossip section don't include:
2251 * 1) Nodes in HANDSHAKE state.
2252 * 3) Nodes with the NOADDR flag set.
2253 * 4) Disconnected nodes if they don't have configured slots.
2254 */
2255 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
2256 (this->link == NULL && this->numslots == 0))
2257 {
2258 freshnodes--; /* Tecnically not correct, but saves CPU. */
2259 continue;
2260 }
2261
2262 /* Check if we already added this node */
2263 for (j = 0; j < gossipcount; j++) {
2264 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
2265 CLUSTER_NAMELEN) == 0) break;
2266 }
2267 if (j != gossipcount) continue;
2268
2269 /* Add it */
2270 freshnodes--;
2271 gossip = &(hdr->data.ping.gossip[gossipcount]);
2272 memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
2273 gossip->ping_sent = htonl(this->ping_sent);
2274 gossip->pong_received = htonl(this->pong_received);
2275 memcpy(gossip->ip,this->ip,sizeof(this->ip));
2276 gossip->port = htons(this->port);
2277 gossip->flags = htons(this->flags);
2278 gossip->notused1 = 0;
2279 gossip->notused2 = 0;
2280 gossipcount++;
2281 }
2282
2283 /* Ready to send... fix the totlen fiend and queue the message in the
2284 * output buffer. */
2285 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2286 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
2287 hdr->count = htons(gossipcount);
2288 hdr->totlen = htonl(totlen);
2289 clusterSendMessage(link,buf,totlen);
2290 zfree(buf);
2291 }
2292
2293 /* Send a PONG packet to every connected node that's not in handshake state
2294 * and for which we have a valid link.
2295 *
2296 * In Redis Cluster pongs are not used just for failure detection, but also
2297 * to carry important configuration information. So broadcasting a pong is
2298 * useful when something changes in the configuration and we want to make
2299 * the cluster aware ASAP (for instance after a slave promotion).
2300 *
2301 * The 'target' argument specifies the receiving instances using the
2302 * defines below:
2303 *
2304 * CLUSTER_BROADCAST_ALL -> All known instances.
2305 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
2306 */
2307 #define CLUSTER_BROADCAST_ALL 0
2308 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
clusterBroadcastPong(int target)2309 void clusterBroadcastPong(int target) {
2310 dictIterator *di;
2311 dictEntry *de;
2312
2313 di = dictGetSafeIterator(server.cluster->nodes);
2314 while((de = dictNext(di)) != NULL) {
2315 clusterNode *node = dictGetVal(de);
2316
2317 if (!node->link) continue;
2318 if (node == myself || nodeInHandshake(node)) continue;
2319 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
2320 int local_slave =
2321 nodeIsSlave(node) && node->slaveof &&
2322 (node->slaveof == myself || node->slaveof == myself->slaveof);
2323 if (!local_slave) continue;
2324 }
2325 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
2326 }
2327 dictReleaseIterator(di);
2328 }
2329
2330 /* Send a PUBLISH message.
2331 *
2332 * If link is NULL, then the message is broadcasted to the whole cluster. */
clusterSendPublish(clusterLink * link,robj * channel,robj * message)2333 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
2334 unsigned char buf[sizeof(clusterMsg)], *payload;
2335 clusterMsg *hdr = (clusterMsg*) buf;
2336 uint32_t totlen;
2337 uint32_t channel_len, message_len;
2338
2339 channel = getDecodedObject(channel);
2340 message = getDecodedObject(message);
2341 channel_len = sdslen(channel->ptr);
2342 message_len = sdslen(message->ptr);
2343
2344 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
2345 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2346 totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
2347
2348 hdr->data.publish.msg.channel_len = htonl(channel_len);
2349 hdr->data.publish.msg.message_len = htonl(message_len);
2350 hdr->totlen = htonl(totlen);
2351
2352 /* Try to use the local buffer if possible */
2353 if (totlen < sizeof(buf)) {
2354 payload = buf;
2355 } else {
2356 payload = zmalloc(totlen);
2357 memcpy(payload,hdr,sizeof(*hdr));
2358 hdr = (clusterMsg*) payload;
2359 }
2360 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
2361 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
2362 message->ptr,sdslen(message->ptr));
2363
2364 if (link)
2365 clusterSendMessage(link,payload,totlen);
2366 else
2367 clusterBroadcastMessage(payload,totlen);
2368
2369 decrRefCount(channel);
2370 decrRefCount(message);
2371 if (payload != buf) zfree(payload);
2372 }
2373
2374 /* Send a FAIL message to all the nodes we are able to contact.
2375 * The FAIL message is sent when we detect that a node is failing
2376 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
2377 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
2378 * nodes to do the same ASAP. */
clusterSendFail(char * nodename)2379 void clusterSendFail(char *nodename) {
2380 unsigned char buf[sizeof(clusterMsg)];
2381 clusterMsg *hdr = (clusterMsg*) buf;
2382
2383 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
2384 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
2385 clusterBroadcastMessage(buf,ntohl(hdr->totlen));
2386 }
2387
2388 /* Send an UPDATE message to the specified link carrying the specified 'node'
2389 * slots configuration. The node name, slots bitmap, and configEpoch info
2390 * are included. */
clusterSendUpdate(clusterLink * link,clusterNode * node)2391 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
2392 unsigned char buf[sizeof(clusterMsg)];
2393 clusterMsg *hdr = (clusterMsg*) buf;
2394
2395 if (link == NULL) return;
2396 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
2397 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
2398 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
2399 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
2400 clusterSendMessage(link,buf,ntohl(hdr->totlen));
2401 }
2402
2403 /* -----------------------------------------------------------------------------
2404 * CLUSTER Pub/Sub support
2405 *
2406 * For now we do very little, just propagating PUBLISH messages across the whole
2407 * cluster. In the future we'll try to get smarter and avoiding propagating those
2408 * messages to hosts without receives for a given channel.
2409 * -------------------------------------------------------------------------- */
clusterPropagatePublish(robj * channel,robj * message)2410 void clusterPropagatePublish(robj *channel, robj *message) {
2411 clusterSendPublish(NULL, channel, message);
2412 }
2413
2414 /* -----------------------------------------------------------------------------
2415 * SLAVE node specific functions
2416 * -------------------------------------------------------------------------- */
2417
2418 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
2419 * see if there is the quorum for this slave instance to failover its failing
2420 * master.
2421 *
2422 * Note that we send the failover request to everybody, master and slave nodes,
2423 * but only the masters are supposed to reply to our query. */
clusterRequestFailoverAuth(void)2424 void clusterRequestFailoverAuth(void) {
2425 unsigned char buf[sizeof(clusterMsg)];
2426 clusterMsg *hdr = (clusterMsg*) buf;
2427 uint32_t totlen;
2428
2429 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
2430 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
2431 * in the header to communicate the nodes receiving the message that
2432 * they should authorized the failover even if the master is working. */
2433 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
2434 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2435 hdr->totlen = htonl(totlen);
2436 clusterBroadcastMessage(buf,totlen);
2437 }
2438
2439 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
clusterSendFailoverAuth(clusterNode * node)2440 void clusterSendFailoverAuth(clusterNode *node) {
2441 unsigned char buf[sizeof(clusterMsg)];
2442 clusterMsg *hdr = (clusterMsg*) buf;
2443 uint32_t totlen;
2444
2445 if (!node->link) return;
2446 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
2447 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2448 hdr->totlen = htonl(totlen);
2449 clusterSendMessage(node->link,buf,totlen);
2450 }
2451
2452 /* Send a MFSTART message to the specified node. */
clusterSendMFStart(clusterNode * node)2453 void clusterSendMFStart(clusterNode *node) {
2454 unsigned char buf[sizeof(clusterMsg)];
2455 clusterMsg *hdr = (clusterMsg*) buf;
2456 uint32_t totlen;
2457
2458 if (!node->link) return;
2459 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
2460 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2461 hdr->totlen = htonl(totlen);
2462 clusterSendMessage(node->link,buf,totlen);
2463 }
2464
2465 /* Vote for the node asking for our vote if there are the conditions. */
clusterSendFailoverAuthIfNeeded(clusterNode * node,clusterMsg * request)2466 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
2467 clusterNode *master = node->slaveof;
2468 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
2469 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
2470 unsigned char *claimed_slots = request->myslots;
2471 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
2472 int j;
2473
2474 /* IF we are not a master serving at least 1 slot, we don't have the
2475 * right to vote, as the cluster size in Redis Cluster is the number
2476 * of masters serving at least one slot, and quorum is the cluster
2477 * size + 1 */
2478 if (nodeIsSlave(myself) || myself->numslots == 0) return;
2479
2480 /* Request epoch must be >= our currentEpoch.
2481 * Note that it is impossible for it to actually be greater since
2482 * our currentEpoch was updated as a side effect of receiving this
2483 * request, if the request epoch was greater. */
2484 if (requestCurrentEpoch < server.cluster->currentEpoch) {
2485 serverLog(LL_WARNING,
2486 "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
2487 node->name,
2488 (unsigned long long) requestCurrentEpoch,
2489 (unsigned long long) server.cluster->currentEpoch);
2490 return;
2491 }
2492
2493 /* I already voted for this epoch? Return ASAP. */
2494 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
2495 serverLog(LL_WARNING,
2496 "Failover auth denied to %.40s: already voted for epoch %llu",
2497 node->name,
2498 (unsigned long long) server.cluster->currentEpoch);
2499 return;
2500 }
2501
2502 /* Node must be a slave and its master down.
2503 * The master can be non failing if the request is flagged
2504 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
2505 if (nodeIsMaster(node) || master == NULL ||
2506 (!nodeFailed(master) && !force_ack))
2507 {
2508 if (nodeIsMaster(node)) {
2509 serverLog(LL_WARNING,
2510 "Failover auth denied to %.40s: it is a master node",
2511 node->name);
2512 } else if (master == NULL) {
2513 serverLog(LL_WARNING,
2514 "Failover auth denied to %.40s: I don't know its master",
2515 node->name);
2516 } else if (!nodeFailed(master)) {
2517 serverLog(LL_WARNING,
2518 "Failover auth denied to %.40s: its master is up",
2519 node->name);
2520 }
2521 return;
2522 }
2523
2524 /* We did not voted for a slave about this master for two
2525 * times the node timeout. This is not strictly needed for correctness
2526 * of the algorithm but makes the base case more linear. */
2527 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
2528 {
2529 serverLog(LL_WARNING,
2530 "Failover auth denied to %.40s: "
2531 "can't vote about this master before %lld milliseconds",
2532 node->name,
2533 (long long) ((server.cluster_node_timeout*2)-
2534 (mstime() - node->slaveof->voted_time)));
2535 return;
2536 }
2537
2538 /* The slave requesting the vote must have a configEpoch for the claimed
2539 * slots that is >= the one of the masters currently serving the same
2540 * slots in the current configuration. */
2541 for (j = 0; j < CLUSTER_SLOTS; j++) {
2542 if (bitmapTestBit(claimed_slots, j) == 0) continue;
2543 if (server.cluster->slots[j] == NULL ||
2544 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
2545 {
2546 continue;
2547 }
2548 /* If we reached this point we found a slot that in our current slots
2549 * is served by a master with a greater configEpoch than the one claimed
2550 * by the slave requesting our vote. Refuse to vote for this slave. */
2551 serverLog(LL_WARNING,
2552 "Failover auth denied to %.40s: "
2553 "slot %d epoch (%llu) > reqEpoch (%llu)",
2554 node->name, j,
2555 (unsigned long long) server.cluster->slots[j]->configEpoch,
2556 (unsigned long long) requestConfigEpoch);
2557 return;
2558 }
2559
2560 /* We can vote for this slave. */
2561 clusterSendFailoverAuth(node);
2562 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
2563 node->slaveof->voted_time = mstime();
2564 serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
2565 node->name, (unsigned long long) server.cluster->currentEpoch);
2566 }
2567
2568 /* This function returns the "rank" of this instance, a slave, in the context
2569 * of its master-slaves ring. The rank of the slave is given by the number of
2570 * other slaves for the same master that have a better replication offset
2571 * compared to the local one (better means, greater, so they claim more data).
2572 *
2573 * A slave with rank 0 is the one with the greatest (most up to date)
2574 * replication offset, and so forth. Note that because how the rank is computed
2575 * multiple slaves may have the same rank, in case they have the same offset.
2576 *
2577 * The slave rank is used to add a delay to start an election in order to
2578 * get voted and replace a failing master. Slaves with better replication
2579 * offsets are more likely to win. */
clusterGetSlaveRank(void)2580 int clusterGetSlaveRank(void) {
2581 long long myoffset;
2582 int j, rank = 0;
2583 clusterNode *master;
2584
2585 serverAssert(nodeIsSlave(myself));
2586 master = myself->slaveof;
2587 if (master == NULL) return 0; /* Never called by slaves without master. */
2588
2589 myoffset = replicationGetSlaveOffset();
2590 for (j = 0; j < master->numslaves; j++)
2591 if (master->slaves[j] != myself &&
2592 master->slaves[j]->repl_offset > myoffset) rank++;
2593 return rank;
2594 }
2595
2596 /* This function is called by clusterHandleSlaveFailover() in order to
2597 * let the slave log why it is not able to failover. Sometimes there are
2598 * not the conditions, but since the failover function is called again and
2599 * again, we can't log the same things continuously.
2600 *
2601 * This function works by logging only if a given set of conditions are
2602 * true:
2603 *
2604 * 1) The reason for which the failover can't be initiated changed.
2605 * The reasons also include a NONE reason we reset the state to
2606 * when the slave finds that its master is fine (no FAIL flag).
2607 * 2) Also, the log is emitted again if the master is still down and
2608 * the reason for not failing over is still the same, but more than
2609 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
2610 * 3) Finally, the function only logs if the slave is down for more than
2611 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
2612 * failover starts in a reasonable time.
2613 *
2614 * The function is called with the reason why the slave can't failover
2615 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
2616 *
2617 * The function is guaranteed to be called only if 'myself' is a slave. */
clusterLogCantFailover(int reason)2618 void clusterLogCantFailover(int reason) {
2619 char *msg;
2620 static time_t lastlog_time = 0;
2621 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
2622
2623 /* Don't log if we have the same reason for some time. */
2624 if (reason == server.cluster->cant_failover_reason &&
2625 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
2626 return;
2627
2628 server.cluster->cant_failover_reason = reason;
2629
2630 /* We also don't emit any log if the master failed no long ago, the
2631 * goal of this function is to log slaves in a stalled condition for
2632 * a long time. */
2633 if (myself->slaveof &&
2634 nodeFailed(myself->slaveof) &&
2635 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
2636
2637 switch(reason) {
2638 case CLUSTER_CANT_FAILOVER_DATA_AGE:
2639 msg = "Disconnected from master for longer than allowed. "
2640 "Please check the 'cluster-slave-validity-factor' configuration "
2641 "option.";
2642 break;
2643 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
2644 msg = "Waiting the delay before I can start a new failover.";
2645 break;
2646 case CLUSTER_CANT_FAILOVER_EXPIRED:
2647 msg = "Failover attempt expired.";
2648 break;
2649 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
2650 msg = "Waiting for votes, but majority still not reached.";
2651 break;
2652 default:
2653 msg = "Unknown reason code.";
2654 break;
2655 }
2656 lastlog_time = time(NULL);
2657 serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
2658 }
2659
2660 /* This function implements the final part of automatic and manual failovers,
2661 * where the slave grabs its master's hash slots, and propagates the new
2662 * configuration.
2663 *
2664 * Note that it's up to the caller to be sure that the node got a new
2665 * configuration epoch already. */
clusterFailoverReplaceYourMaster(void)2666 void clusterFailoverReplaceYourMaster(void) {
2667 int j;
2668 clusterNode *oldmaster = myself->slaveof;
2669
2670 if (nodeIsMaster(myself) || oldmaster == NULL) return;
2671
2672 /* 1) Turn this node into a master. */
2673 clusterSetNodeAsMaster(myself);
2674 replicationUnsetMaster();
2675
2676 /* 2) Claim all the slots assigned to our master. */
2677 for (j = 0; j < CLUSTER_SLOTS; j++) {
2678 if (clusterNodeGetSlotBit(oldmaster,j)) {
2679 clusterDelSlot(j);
2680 clusterAddSlot(myself,j);
2681 }
2682 }
2683
2684 /* 3) Update state and save config. */
2685 clusterUpdateState();
2686 clusterSaveConfigOrDie(1);
2687
2688 /* 4) Pong all the other nodes so that they can update the state
2689 * accordingly and detect that we switched to master role. */
2690 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
2691
2692 /* 5) If there was a manual failover in progress, clear the state. */
2693 resetManualFailover();
2694 }
2695
2696 /* This function is called if we are a slave node and our master serving
2697 * a non-zero amount of hash slots is in FAIL state.
2698 *
2699 * The gaol of this function is:
2700 * 1) To check if we are able to perform a failover, is our data updated?
2701 * 2) Try to get elected by masters.
2702 * 3) Perform the failover informing all the other nodes.
2703 */
clusterHandleSlaveFailover(void)2704 void clusterHandleSlaveFailover(void) {
2705 mstime_t data_age;
2706 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
2707 int needed_quorum = (server.cluster->size / 2) + 1;
2708 int manual_failover = server.cluster->mf_end != 0 &&
2709 server.cluster->mf_can_start;
2710 mstime_t auth_timeout, auth_retry_time;
2711
2712 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
2713
2714 /* Compute the failover timeout (the max time we have to send votes
2715 * and wait for replies), and the failover retry time (the time to wait
2716 * before trying to get voted again).
2717 *
2718 * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
2719 * Retry is two times the Timeout.
2720 */
2721 auth_timeout = server.cluster_node_timeout*2;
2722 if (auth_timeout < 2000) auth_timeout = 2000;
2723 auth_retry_time = auth_timeout*2;
2724
2725 /* Pre conditions to run the function, that must be met both in case
2726 * of an automatic or manual failover:
2727 * 1) We are a slave.
2728 * 2) Our master is flagged as FAIL, or this is a manual failover.
2729 * 3) It is serving slots. */
2730 if (nodeIsMaster(myself) ||
2731 myself->slaveof == NULL ||
2732 (!nodeFailed(myself->slaveof) && !manual_failover) ||
2733 myself->slaveof->numslots == 0)
2734 {
2735 /* There are no reasons to failover, so we set the reason why we
2736 * are returning without failing over to NONE. */
2737 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
2738 return;
2739 }
2740
2741 /* Set data_age to the number of seconds we are disconnected from
2742 * the master. */
2743 if (server.repl_state == REPL_STATE_CONNECTED) {
2744 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
2745 * 1000;
2746 } else {
2747 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
2748 }
2749
2750 /* Remove the node timeout from the data age as it is fine that we are
2751 * disconnected from our master at least for the time it was down to be
2752 * flagged as FAIL, that's the baseline. */
2753 if (data_age > server.cluster_node_timeout)
2754 data_age -= server.cluster_node_timeout;
2755
2756 /* Check if our data is recent enough according to the slave validity
2757 * factor configured by the user.
2758 *
2759 * Check bypassed for manual failovers. */
2760 if (server.cluster_slave_validity_factor &&
2761 data_age >
2762 (((mstime_t)server.repl_ping_slave_period * 1000) +
2763 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
2764 {
2765 if (!manual_failover) {
2766 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
2767 return;
2768 }
2769 }
2770
2771 /* If the previous failover attempt timedout and the retry time has
2772 * elapsed, we can setup a new one. */
2773 if (auth_age > auth_retry_time) {
2774 server.cluster->failover_auth_time = mstime() +
2775 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
2776 random() % 500; /* Random delay between 0 and 500 milliseconds. */
2777 server.cluster->failover_auth_count = 0;
2778 server.cluster->failover_auth_sent = 0;
2779 server.cluster->failover_auth_rank = clusterGetSlaveRank();
2780 /* We add another delay that is proportional to the slave rank.
2781 * Specifically 1 second * rank. This way slaves that have a probably
2782 * less updated replication offset, are penalized. */
2783 server.cluster->failover_auth_time +=
2784 server.cluster->failover_auth_rank * 1000;
2785 /* However if this is a manual failover, no delay is needed. */
2786 if (server.cluster->mf_end) {
2787 server.cluster->failover_auth_time = mstime();
2788 server.cluster->failover_auth_rank = 0;
2789 }
2790 serverLog(LL_WARNING,
2791 "Start of election delayed for %lld milliseconds "
2792 "(rank #%d, offset %lld).",
2793 server.cluster->failover_auth_time - mstime(),
2794 server.cluster->failover_auth_rank,
2795 replicationGetSlaveOffset());
2796 /* Now that we have a scheduled election, broadcast our offset
2797 * to all the other slaves so that they'll updated their offsets
2798 * if our offset is better. */
2799 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
2800 return;
2801 }
2802
2803 /* It is possible that we received more updated offsets from other
2804 * slaves for the same master since we computed our election delay.
2805 * Update the delay if our rank changed.
2806 *
2807 * Not performed if this is a manual failover. */
2808 if (server.cluster->failover_auth_sent == 0 &&
2809 server.cluster->mf_end == 0)
2810 {
2811 int newrank = clusterGetSlaveRank();
2812 if (newrank > server.cluster->failover_auth_rank) {
2813 long long added_delay =
2814 (newrank - server.cluster->failover_auth_rank) * 1000;
2815 server.cluster->failover_auth_time += added_delay;
2816 server.cluster->failover_auth_rank = newrank;
2817 serverLog(LL_WARNING,
2818 "Slave rank updated to #%d, added %lld milliseconds of delay.",
2819 newrank, added_delay);
2820 }
2821 }
2822
2823 /* Return ASAP if we can't still start the election. */
2824 if (mstime() < server.cluster->failover_auth_time) {
2825 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
2826 return;
2827 }
2828
2829 /* Return ASAP if the election is too old to be valid. */
2830 if (auth_age > auth_timeout) {
2831 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
2832 return;
2833 }
2834
2835 /* Ask for votes if needed. */
2836 if (server.cluster->failover_auth_sent == 0) {
2837 server.cluster->currentEpoch++;
2838 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
2839 serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
2840 (unsigned long long) server.cluster->currentEpoch);
2841 clusterRequestFailoverAuth();
2842 server.cluster->failover_auth_sent = 1;
2843 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2844 CLUSTER_TODO_UPDATE_STATE|
2845 CLUSTER_TODO_FSYNC_CONFIG);
2846 return; /* Wait for replies. */
2847 }
2848
2849 /* Check if we reached the quorum. */
2850 if (server.cluster->failover_auth_count >= needed_quorum) {
2851 /* We have the quorum, we can finally failover the master. */
2852
2853 serverLog(LL_WARNING,
2854 "Failover election won: I'm the new master.");
2855
2856 /* Update my configEpoch to the epoch of the election. */
2857 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
2858 myself->configEpoch = server.cluster->failover_auth_epoch;
2859 serverLog(LL_WARNING,
2860 "configEpoch set to %llu after successful failover",
2861 (unsigned long long) myself->configEpoch);
2862 }
2863
2864 /* Take responsability for the cluster slots. */
2865 clusterFailoverReplaceYourMaster();
2866 } else {
2867 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
2868 }
2869 }
2870
2871 /* -----------------------------------------------------------------------------
2872 * CLUSTER slave migration
2873 *
2874 * Slave migration is the process that allows a slave of a master that is
2875 * already covered by at least another slave, to "migrate" to a master that
2876 * is orpaned, that is, left with no working slaves.
2877 * ------------------------------------------------------------------------- */
2878
2879 /* This function is responsible to decide if this replica should be migrated
2880 * to a different (orphaned) master. It is called by the clusterCron() function
2881 * only if:
2882 *
2883 * 1) We are a slave node.
2884 * 2) It was detected that there is at least one orphaned master in
2885 * the cluster.
2886 * 3) We are a slave of one of the masters with the greatest number of
2887 * slaves.
2888 *
2889 * This checks are performed by the caller since it requires to iterate
2890 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
2891 * if definitely needed.
2892 *
2893 * The fuction is called with a pre-computed max_slaves, that is the max
2894 * number of working (not in FAIL state) slaves for a single master.
2895 *
2896 * Additional conditions for migration are examined inside the function.
2897 */
clusterHandleSlaveMigration(int max_slaves)2898 void clusterHandleSlaveMigration(int max_slaves) {
2899 int j, okslaves = 0;
2900 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
2901 dictIterator *di;
2902 dictEntry *de;
2903
2904 /* Step 1: Don't migrate if the cluster state is not ok. */
2905 if (server.cluster->state != CLUSTER_OK) return;
2906
2907 /* Step 2: Don't migrate if my master will not be left with at least
2908 * 'migration-barrier' slaves after my migration. */
2909 if (mymaster == NULL) return;
2910 for (j = 0; j < mymaster->numslaves; j++)
2911 if (!nodeFailed(mymaster->slaves[j]) &&
2912 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
2913 if (okslaves <= server.cluster_migration_barrier) return;
2914
2915 /* Step 3: Idenitfy a candidate for migration, and check if among the
2916 * masters with the greatest number of ok slaves, I'm the one with the
2917 * smallest node ID (the "candidate slave").
2918 *
2919 * Note: this means that eventually a replica migration will occurr
2920 * since slaves that are reachable again always have their FAIL flag
2921 * cleared, so eventually there must be a candidate. At the same time
2922 * this does not mean that there are no race conditions possible (two
2923 * slaves migrating at the same time), but this is unlikely to
2924 * happen, and harmless when happens. */
2925 candidate = myself;
2926 di = dictGetSafeIterator(server.cluster->nodes);
2927 while((de = dictNext(di)) != NULL) {
2928 clusterNode *node = dictGetVal(de);
2929 int okslaves = 0, is_orphaned = 1;
2930
2931 /* We want to migrate only if this master is working, orphaned, and
2932 * used to have slaves or if failed over a master that had slaves
2933 * (MIGRATE_TO flag). This way we only migrate to instances that were
2934 * supposed to have replicas. */
2935 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
2936 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
2937
2938 /* Check number of working slaves. */
2939 if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
2940 if (okslaves > 0) is_orphaned = 0;
2941
2942 if (is_orphaned) {
2943 if (!target && node->numslots > 0) target = node;
2944
2945 /* Track the starting time of the orphaned condition for this
2946 * master. */
2947 if (!node->orphaned_time) node->orphaned_time = mstime();
2948 } else {
2949 node->orphaned_time = 0;
2950 }
2951
2952 /* Check if I'm the slave candidate for the migration: attached
2953 * to a master with the maximum number of slaves and with the smallest
2954 * node ID. */
2955 if (okslaves == max_slaves) {
2956 for (j = 0; j < node->numslaves; j++) {
2957 if (memcmp(node->slaves[j]->name,
2958 candidate->name,
2959 CLUSTER_NAMELEN) < 0)
2960 {
2961 candidate = node->slaves[j];
2962 }
2963 }
2964 }
2965 }
2966 dictReleaseIterator(di);
2967
2968 /* Step 4: perform the migration if there is a target, and if I'm the
2969 * candidate, but only if the master is continuously orphaned for a
2970 * couple of seconds, so that during failovers, we give some time to
2971 * the natural slaves of this instance to advertise their switch from
2972 * the old master to the new one. */
2973 if (target && candidate == myself &&
2974 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY)
2975 {
2976 serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
2977 target->name);
2978 clusterSetMaster(target);
2979 }
2980 }
2981
2982 /* -----------------------------------------------------------------------------
2983 * CLUSTER manual failover
2984 *
2985 * This are the important steps performed by slaves during a manual failover:
2986 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
2987 * setting mf_end to the millisecond unix time at which we'll abort the
2988 * attempt.
2989 * 2) Slave sends a MFSTART message to the master requesting to pause clients
2990 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
2991 * When master is paused for manual failover, it also starts to flag
2992 * packets with CLUSTERMSG_FLAG0_PAUSED.
2993 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
2994 * 4) If slave received the offset from the master, and its offset matches,
2995 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
2996 * the failover as usually, with the difference that the vote request
2997 * will be modified to force masters to vote for a slave that has a
2998 * working master.
2999 *
3000 * From the point of view of the master things are simpler: when a
3001 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
3002 * the sender in mf_slave. During the time limit for the manual failover
3003 * the master will just send PINGs more often to this slave, flagged with
3004 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
3005 * a packet from the master with this flag set.
3006 *
3007 * The gaol of the manual failover is to perform a fast failover without
3008 * data loss due to the asynchronous master-slave replication.
3009 * -------------------------------------------------------------------------- */
3010
3011 /* Reset the manual failover state. This works for both masters and slavesa
3012 * as all the state about manual failover is cleared.
3013 *
3014 * The function can be used both to initialize the manual failover state at
3015 * startup or to abort a manual failover in progress. */
resetManualFailover(void)3016 void resetManualFailover(void) {
3017 if (server.cluster->mf_end && clientsArePaused()) {
3018 server.clients_pause_end_time = 0;
3019 clientsArePaused(); /* Just use the side effect of the function. */
3020 }
3021 server.cluster->mf_end = 0; /* No manual failover in progress. */
3022 server.cluster->mf_can_start = 0;
3023 server.cluster->mf_slave = NULL;
3024 server.cluster->mf_master_offset = 0;
3025 }
3026
3027 /* If a manual failover timed out, abort it. */
manualFailoverCheckTimeout(void)3028 void manualFailoverCheckTimeout(void) {
3029 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
3030 serverLog(LL_WARNING,"Manual failover timed out.");
3031 resetManualFailover();
3032 }
3033 }
3034
3035 /* This function is called from the cluster cron function in order to go
3036 * forward with a manual failover state machine. */
clusterHandleManualFailover(void)3037 void clusterHandleManualFailover(void) {
3038 /* Return ASAP if no manual failover is in progress. */
3039 if (server.cluster->mf_end == 0) return;
3040
3041 /* If mf_can_start is non-zero, the failover was already triggered so the
3042 * next steps are performed by clusterHandleSlaveFailover(). */
3043 if (server.cluster->mf_can_start) return;
3044
3045 if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
3046
3047 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
3048 /* Our replication offset matches the master replication offset
3049 * announced after clients were paused. We can start the failover. */
3050 server.cluster->mf_can_start = 1;
3051 serverLog(LL_WARNING,
3052 "All master replication stream processed, "
3053 "manual failover can start.");
3054 }
3055 }
3056
3057 /* -----------------------------------------------------------------------------
3058 * CLUSTER cron job
3059 * -------------------------------------------------------------------------- */
3060
3061 /* This is executed 10 times every second */
clusterCron(void)3062 void clusterCron(void) {
3063 dictIterator *di;
3064 dictEntry *de;
3065 int update_state = 0;
3066 int orphaned_masters; /* How many masters there are without ok slaves. */
3067 int max_slaves; /* Max number of ok slaves for a single master. */
3068 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
3069 mstime_t min_pong = 0, now = mstime();
3070 clusterNode *min_pong_node = NULL;
3071 static unsigned long long iteration = 0;
3072 mstime_t handshake_timeout;
3073
3074 iteration++; /* Number of times this function was called so far. */
3075
3076 /* The handshake timeout is the time after which a handshake node that was
3077 * not turned into a normal node is removed from the nodes. Usually it is
3078 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
3079 * the value of 1 second. */
3080 handshake_timeout = server.cluster_node_timeout;
3081 if (handshake_timeout < 1000) handshake_timeout = 1000;
3082
3083 /* Check if we have disconnected nodes and re-establish the connection. */
3084 di = dictGetSafeIterator(server.cluster->nodes);
3085 while((de = dictNext(di)) != NULL) {
3086 clusterNode *node = dictGetVal(de);
3087
3088 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
3089
3090 /* A Node in HANDSHAKE state has a limited lifespan equal to the
3091 * configured node timeout. */
3092 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
3093 clusterDelNode(node);
3094 continue;
3095 }
3096
3097 if (node->link == NULL) {
3098 int fd;
3099 mstime_t old_ping_sent;
3100 clusterLink *link;
3101
3102 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
3103 node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
3104 if (fd == -1) {
3105 /* We got a synchronous error from connect before
3106 * clusterSendPing() had a chance to be called.
3107 * If node->ping_sent is zero, failure detection can't work,
3108 * so we claim we actually sent a ping now (that will
3109 * be really sent as soon as the link is obtained). */
3110 if (node->ping_sent == 0) node->ping_sent = mstime();
3111 serverLog(LL_DEBUG, "Unable to connect to "
3112 "Cluster Node [%s]:%d -> %s", node->ip,
3113 node->port+CLUSTER_PORT_INCR,
3114 server.neterr);
3115 continue;
3116 }
3117 link = createClusterLink(node);
3118 link->fd = fd;
3119 node->link = link;
3120 aeCreateFileEvent(server.el,link->fd,AE_READABLE,
3121 clusterReadHandler,link);
3122 /* Queue a PING in the new connection ASAP: this is crucial
3123 * to avoid false positives in failure detection.
3124 *
3125 * If the node is flagged as MEET, we send a MEET message instead
3126 * of a PING one, to force the receiver to add us in its node
3127 * table. */
3128 old_ping_sent = node->ping_sent;
3129 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3130 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3131 if (old_ping_sent) {
3132 /* If there was an active ping before the link was
3133 * disconnected, we want to restore the ping time, otherwise
3134 * replaced by the clusterSendPing() call. */
3135 node->ping_sent = old_ping_sent;
3136 }
3137 /* We can clear the flag after the first packet is sent.
3138 * If we'll never receive a PONG, we'll never send new packets
3139 * to this node. Instead after the PONG is received and we
3140 * are no longer in meet/handshake status, we want to send
3141 * normal PING packets. */
3142 node->flags &= ~CLUSTER_NODE_MEET;
3143
3144 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3145 node->name, node->ip, node->port+CLUSTER_PORT_INCR);
3146 }
3147 }
3148 dictReleaseIterator(di);
3149
3150 /* Ping some random node 1 time every 10 iterations, so that we usually ping
3151 * one random node every second. */
3152 if (!(iteration % 10)) {
3153 int j;
3154
3155 /* Check a few random nodes and ping the one with the oldest
3156 * pong_received time. */
3157 for (j = 0; j < 5; j++) {
3158 de = dictGetRandomKey(server.cluster->nodes);
3159 clusterNode *this = dictGetVal(de);
3160
3161 /* Don't ping nodes disconnected or with a ping currently active. */
3162 if (this->link == NULL || this->ping_sent != 0) continue;
3163 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3164 continue;
3165 if (min_pong_node == NULL || min_pong > this->pong_received) {
3166 min_pong_node = this;
3167 min_pong = this->pong_received;
3168 }
3169 }
3170 if (min_pong_node) {
3171 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
3172 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
3173 }
3174 }
3175
3176 /* Iterate nodes to check if we need to flag something as failing.
3177 * This loop is also responsible to:
3178 * 1) Check if there are orphaned masters (masters without non failing
3179 * slaves).
3180 * 2) Count the max number of non failing slaves for a single master.
3181 * 3) Count the number of slaves for our master, if we are a slave. */
3182 orphaned_masters = 0;
3183 max_slaves = 0;
3184 this_slaves = 0;
3185 di = dictGetSafeIterator(server.cluster->nodes);
3186 while((de = dictNext(di)) != NULL) {
3187 clusterNode *node = dictGetVal(de);
3188 now = mstime(); /* Use an updated time at every iteration. */
3189 mstime_t delay;
3190
3191 if (node->flags &
3192 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
3193 continue;
3194
3195 /* Orphaned master check, useful only if the current instance
3196 * is a slave that may migrate to another master. */
3197 if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
3198 int okslaves = clusterCountNonFailingSlaves(node);
3199
3200 /* A master is orphaned if it is serving a non-zero number of
3201 * slots, have no working slaves, but used to have at least one
3202 * slave, or failed over a master that used to have slaves. */
3203 if (okslaves == 0 && node->numslots > 0 &&
3204 node->flags & CLUSTER_NODE_MIGRATE_TO)
3205 {
3206 orphaned_masters++;
3207 }
3208 if (okslaves > max_slaves) max_slaves = okslaves;
3209 if (nodeIsSlave(myself) && myself->slaveof == node)
3210 this_slaves = okslaves;
3211 }
3212
3213 /* If we are waiting for the PONG more than half the cluster
3214 * timeout, reconnect the link: maybe there is a connection
3215 * issue even if the node is alive. */
3216 if (node->link && /* is connected */
3217 now - node->link->ctime >
3218 server.cluster_node_timeout && /* was not already reconnected */
3219 node->ping_sent && /* we already sent a ping */
3220 node->pong_received < node->ping_sent && /* still waiting pong */
3221 /* and we are waiting for the pong more than timeout/2 */
3222 now - node->ping_sent > server.cluster_node_timeout/2)
3223 {
3224 /* Disconnect the link, it will be reconnected automatically. */
3225 freeClusterLink(node->link);
3226 }
3227
3228 /* If we have currently no active ping in this instance, and the
3229 * received PONG is older than half the cluster timeout, send
3230 * a new ping now, to ensure all the nodes are pinged without
3231 * a too big delay. */
3232 if (node->link &&
3233 node->ping_sent == 0 &&
3234 (now - node->pong_received) > server.cluster_node_timeout/2)
3235 {
3236 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3237 continue;
3238 }
3239
3240 /* If we are a master and one of the slaves requested a manual
3241 * failover, ping it continuously. */
3242 if (server.cluster->mf_end &&
3243 nodeIsMaster(myself) &&
3244 server.cluster->mf_slave == node &&
3245 node->link)
3246 {
3247 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
3248 continue;
3249 }
3250
3251 /* Check only if we have an active ping for this instance. */
3252 if (node->ping_sent == 0) continue;
3253
3254 /* Compute the delay of the PONG. Note that if we already received
3255 * the PONG, then node->ping_sent is zero, so can't reach this
3256 * code at all. */
3257 delay = now - node->ping_sent;
3258
3259 if (delay > server.cluster_node_timeout) {
3260 /* Timeout reached. Set the node as possibly failing if it is
3261 * not already in this state. */
3262 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
3263 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
3264 node->name);
3265 node->flags |= CLUSTER_NODE_PFAIL;
3266 update_state = 1;
3267 }
3268 }
3269 }
3270 dictReleaseIterator(di);
3271
3272 /* If we are a slave node but the replication is still turned off,
3273 * enable it if we know the address of our master and it appears to
3274 * be up. */
3275 if (nodeIsSlave(myself) &&
3276 server.masterhost == NULL &&
3277 myself->slaveof &&
3278 nodeHasAddr(myself->slaveof))
3279 {
3280 replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
3281 }
3282
3283 /* Abourt a manual failover if the timeout is reached. */
3284 manualFailoverCheckTimeout();
3285
3286 if (nodeIsSlave(myself)) {
3287 clusterHandleManualFailover();
3288 clusterHandleSlaveFailover();
3289 /* If there are orphaned slaves, and we are a slave among the masters
3290 * with the max number of non-failing slaves, consider migrating to
3291 * the orphaned masters. Note that it does not make sense to try
3292 * a migration if there is no master with at least *two* working
3293 * slaves. */
3294 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
3295 clusterHandleSlaveMigration(max_slaves);
3296 }
3297
3298 if (update_state || server.cluster->state == CLUSTER_FAIL)
3299 clusterUpdateState();
3300 }
3301
3302 /* This function is called before the event handler returns to sleep for
3303 * events. It is useful to perform operations that must be done ASAP in
3304 * reaction to events fired but that are not safe to perform inside event
3305 * handlers, or to perform potentially expansive tasks that we need to do
3306 * a single time before replying to clients. */
clusterBeforeSleep(void)3307 void clusterBeforeSleep(void) {
3308 /* Handle failover, this is needed when it is likely that there is already
3309 * the quorum from masters in order to react fast. */
3310 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
3311 clusterHandleSlaveFailover();
3312
3313 /* Update the cluster state. */
3314 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
3315 clusterUpdateState();
3316
3317 /* Save the config, possibly using fsync. */
3318 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
3319 int fsync = server.cluster->todo_before_sleep &
3320 CLUSTER_TODO_FSYNC_CONFIG;
3321 clusterSaveConfigOrDie(fsync);
3322 }
3323
3324 /* Reset our flags (not strictly needed since every single function
3325 * called for flags set should be able to clear its flag). */
3326 server.cluster->todo_before_sleep = 0;
3327 }
3328
clusterDoBeforeSleep(int flags)3329 void clusterDoBeforeSleep(int flags) {
3330 server.cluster->todo_before_sleep |= flags;
3331 }
3332
3333 /* -----------------------------------------------------------------------------
3334 * Slots management
3335 * -------------------------------------------------------------------------- */
3336
3337 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
3338 * otherwise 0. */
bitmapTestBit(unsigned char * bitmap,int pos)3339 int bitmapTestBit(unsigned char *bitmap, int pos) {
3340 off_t byte = pos/8;
3341 int bit = pos&7;
3342 return (bitmap[byte] & (1<<bit)) != 0;
3343 }
3344
3345 /* Set the bit at position 'pos' in a bitmap. */
bitmapSetBit(unsigned char * bitmap,int pos)3346 void bitmapSetBit(unsigned char *bitmap, int pos) {
3347 off_t byte = pos/8;
3348 int bit = pos&7;
3349 bitmap[byte] |= 1<<bit;
3350 }
3351
3352 /* Clear the bit at position 'pos' in a bitmap. */
bitmapClearBit(unsigned char * bitmap,int pos)3353 void bitmapClearBit(unsigned char *bitmap, int pos) {
3354 off_t byte = pos/8;
3355 int bit = pos&7;
3356 bitmap[byte] &= ~(1<<bit);
3357 }
3358
3359 /* Return non-zero if there is at least one master with slaves in the cluster.
3360 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
3361 * MIGRATE_TO flag the when a master gets the first slot. */
clusterMastersHaveSlaves(void)3362 int clusterMastersHaveSlaves(void) {
3363 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3364 dictEntry *de;
3365 int slaves = 0;
3366 while((de = dictNext(di)) != NULL) {
3367 clusterNode *node = dictGetVal(de);
3368
3369 if (nodeIsSlave(node)) continue;
3370 slaves += node->numslaves;
3371 }
3372 dictReleaseIterator(di);
3373 return slaves != 0;
3374 }
3375
3376 /* Set the slot bit and return the old value. */
clusterNodeSetSlotBit(clusterNode * n,int slot)3377 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
3378 int old = bitmapTestBit(n->slots,slot);
3379 bitmapSetBit(n->slots,slot);
3380 if (!old) {
3381 n->numslots++;
3382 /* When a master gets its first slot, even if it has no slaves,
3383 * it gets flagged with MIGRATE_TO, that is, the master is a valid
3384 * target for replicas migration, if and only if at least one of
3385 * the other masters has slaves right now.
3386 *
3387 * Normally masters are valid targerts of replica migration if:
3388 * 1. The used to have slaves (but no longer have).
3389 * 2. They are slaves failing over a master that used to have slaves.
3390 *
3391 * However new masters with slots assigned are considered valid
3392 * migration tagets if the rest of the cluster is not a slave-less.
3393 *
3394 * See https://github.com/antirez/redis/issues/3043 for more info. */
3395 if (n->numslots == 1 && clusterMastersHaveSlaves())
3396 n->flags |= CLUSTER_NODE_MIGRATE_TO;
3397 }
3398 return old;
3399 }
3400
3401 /* Clear the slot bit and return the old value. */
clusterNodeClearSlotBit(clusterNode * n,int slot)3402 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
3403 int old = bitmapTestBit(n->slots,slot);
3404 bitmapClearBit(n->slots,slot);
3405 if (old) n->numslots--;
3406 return old;
3407 }
3408
3409 /* Return the slot bit from the cluster node structure. */
clusterNodeGetSlotBit(clusterNode * n,int slot)3410 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
3411 return bitmapTestBit(n->slots,slot);
3412 }
3413
3414 /* Add the specified slot to the list of slots that node 'n' will
3415 * serve. Return C_OK if the operation ended with success.
3416 * If the slot is already assigned to another instance this is considered
3417 * an error and C_ERR is returned. */
clusterAddSlot(clusterNode * n,int slot)3418 int clusterAddSlot(clusterNode *n, int slot) {
3419 if (server.cluster->slots[slot]) return C_ERR;
3420 clusterNodeSetSlotBit(n,slot);
3421 server.cluster->slots[slot] = n;
3422 return C_OK;
3423 }
3424
3425 /* Delete the specified slot marking it as unassigned.
3426 * Returns C_OK if the slot was assigned, otherwise if the slot was
3427 * already unassigned C_ERR is returned. */
clusterDelSlot(int slot)3428 int clusterDelSlot(int slot) {
3429 clusterNode *n = server.cluster->slots[slot];
3430
3431 if (!n) return C_ERR;
3432 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
3433 server.cluster->slots[slot] = NULL;
3434 return C_OK;
3435 }
3436
3437 /* Delete all the slots associated with the specified node.
3438 * The number of deleted slots is returned. */
clusterDelNodeSlots(clusterNode * node)3439 int clusterDelNodeSlots(clusterNode *node) {
3440 int deleted = 0, j;
3441
3442 for (j = 0; j < CLUSTER_SLOTS; j++) {
3443 if (clusterNodeGetSlotBit(node,j)) clusterDelSlot(j);
3444 deleted++;
3445 }
3446 return deleted;
3447 }
3448
3449 /* Clear the migrating / importing state for all the slots.
3450 * This is useful at initialization and when turning a master into slave. */
clusterCloseAllSlots(void)3451 void clusterCloseAllSlots(void) {
3452 memset(server.cluster->migrating_slots_to,0,
3453 sizeof(server.cluster->migrating_slots_to));
3454 memset(server.cluster->importing_slots_from,0,
3455 sizeof(server.cluster->importing_slots_from));
3456 }
3457
3458 /* -----------------------------------------------------------------------------
3459 * Cluster state evaluation function
3460 * -------------------------------------------------------------------------- */
3461
3462 /* The following are defines that are only used in the evaluation function
3463 * and are based on heuristics. Actaully the main point about the rejoin and
3464 * writable delay is that they should be a few orders of magnitude larger
3465 * than the network latency. */
3466 #define CLUSTER_MAX_REJOIN_DELAY 5000
3467 #define CLUSTER_MIN_REJOIN_DELAY 500
3468 #define CLUSTER_WRITABLE_DELAY 2000
3469
clusterUpdateState(void)3470 void clusterUpdateState(void) {
3471 int j, new_state;
3472 int reachable_masters = 0;
3473 static mstime_t among_minority_time;
3474 static mstime_t first_call_time = 0;
3475
3476 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
3477
3478 /* If this is a master node, wait some time before turning the state
3479 * into OK, since it is not a good idea to rejoin the cluster as a writable
3480 * master, after a reboot, without giving the cluster a chance to
3481 * reconfigure this node. Note that the delay is calculated starting from
3482 * the first call to this function and not since the server start, in order
3483 * to don't count the DB loading time. */
3484 if (first_call_time == 0) first_call_time = mstime();
3485 if (nodeIsMaster(myself) &&
3486 server.cluster->state == CLUSTER_FAIL &&
3487 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
3488
3489 /* Start assuming the state is OK. We'll turn it into FAIL if there
3490 * are the right conditions. */
3491 new_state = CLUSTER_OK;
3492
3493 /* Check if all the slots are covered. */
3494 if (server.cluster_require_full_coverage) {
3495 for (j = 0; j < CLUSTER_SLOTS; j++) {
3496 if (server.cluster->slots[j] == NULL ||
3497 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
3498 {
3499 new_state = CLUSTER_FAIL;
3500 break;
3501 }
3502 }
3503 }
3504
3505 /* Compute the cluster size, that is the number of master nodes
3506 * serving at least a single slot.
3507 *
3508 * At the same time count the number of reachable masters having
3509 * at least one slot. */
3510 {
3511 dictIterator *di;
3512 dictEntry *de;
3513
3514 server.cluster->size = 0;
3515 di = dictGetSafeIterator(server.cluster->nodes);
3516 while((de = dictNext(di)) != NULL) {
3517 clusterNode *node = dictGetVal(de);
3518
3519 if (nodeIsMaster(node) && node->numslots) {
3520 server.cluster->size++;
3521 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
3522 reachable_masters++;
3523 }
3524 }
3525 dictReleaseIterator(di);
3526 }
3527
3528 /* If we are in a minority partition, change the cluster state
3529 * to FAIL. */
3530 {
3531 int needed_quorum = (server.cluster->size / 2) + 1;
3532
3533 if (reachable_masters < needed_quorum) {
3534 new_state = CLUSTER_FAIL;
3535 among_minority_time = mstime();
3536 }
3537 }
3538
3539 /* Log a state change */
3540 if (new_state != server.cluster->state) {
3541 mstime_t rejoin_delay = server.cluster_node_timeout;
3542
3543 /* If the instance is a master and was partitioned away with the
3544 * minority, don't let it accept queries for some time after the
3545 * partition heals, to make sure there is enough time to receive
3546 * a configuration update. */
3547 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
3548 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
3549 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
3550 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
3551
3552 if (new_state == CLUSTER_OK &&
3553 nodeIsMaster(myself) &&
3554 mstime() - among_minority_time < rejoin_delay)
3555 {
3556 return;
3557 }
3558
3559 /* Change the state and log the event. */
3560 serverLog(LL_WARNING,"Cluster state changed: %s",
3561 new_state == CLUSTER_OK ? "ok" : "fail");
3562 server.cluster->state = new_state;
3563 }
3564 }
3565
3566 /* This function is called after the node startup in order to verify that data
3567 * loaded from disk is in agreement with the cluster configuration:
3568 *
3569 * 1) If we find keys about hash slots we have no responsibility for, the
3570 * following happens:
3571 * A) If no other node is in charge according to the current cluster
3572 * configuration, we add these slots to our node.
3573 * B) If according to our config other nodes are already in charge for
3574 * this lots, we set the slots as IMPORTING from our point of view
3575 * in order to justify we have those slots, and in order to make
3576 * redis-trib aware of the issue, so that it can try to fix it.
3577 * 2) If we find data in a DB different than DB0 we return C_ERR to
3578 * signal the caller it should quit the server with an error message
3579 * or take other actions.
3580 *
3581 * The function always returns C_OK even if it will try to correct
3582 * the error described in "1". However if data is found in DB different
3583 * from DB0, C_ERR is returned.
3584 *
3585 * The function also uses the logging facility in order to warn the user
3586 * about desynchronizations between the data we have in memory and the
3587 * cluster configuration. */
verifyClusterConfigWithData(void)3588 int verifyClusterConfigWithData(void) {
3589 int j;
3590 int update_config = 0;
3591
3592 /* If this node is a slave, don't perform the check at all as we
3593 * completely depend on the replication stream. */
3594 if (nodeIsSlave(myself)) return C_OK;
3595
3596 /* Make sure we only have keys in DB0. */
3597 for (j = 1; j < server.dbnum; j++) {
3598 if (dictSize(server.db[j].dict)) return C_ERR;
3599 }
3600
3601 /* Check that all the slots we see populated memory have a corresponding
3602 * entry in the cluster table. Otherwise fix the table. */
3603 for (j = 0; j < CLUSTER_SLOTS; j++) {
3604 if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
3605 /* Check if we are assigned to this slot or if we are importing it.
3606 * In both cases check the next slot as the configuration makes
3607 * sense. */
3608 if (server.cluster->slots[j] == myself ||
3609 server.cluster->importing_slots_from[j] != NULL) continue;
3610
3611 /* If we are here data and cluster config don't agree, and we have
3612 * slot 'j' populated even if we are not importing it, nor we are
3613 * assigned to this slot. Fix this condition. */
3614
3615 update_config++;
3616 /* Case A: slot is unassigned. Take responsibility for it. */
3617 if (server.cluster->slots[j] == NULL) {
3618 serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
3619 "Taking responsibility for it.",j);
3620 clusterAddSlot(myself,j);
3621 } else {
3622 serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
3623 "assigned to another node. "
3624 "Setting it to importing state.",j);
3625 server.cluster->importing_slots_from[j] = server.cluster->slots[j];
3626 }
3627 }
3628 if (update_config) clusterSaveConfigOrDie(1);
3629 return C_OK;
3630 }
3631
3632 /* -----------------------------------------------------------------------------
3633 * SLAVE nodes handling
3634 * -------------------------------------------------------------------------- */
3635
3636 /* Set the specified node 'n' as master for this node.
3637 * If this node is currently a master, it is turned into a slave. */
clusterSetMaster(clusterNode * n)3638 void clusterSetMaster(clusterNode *n) {
3639 serverAssert(n != myself);
3640 serverAssert(myself->numslots == 0);
3641
3642 if (nodeIsMaster(myself)) {
3643 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
3644 myself->flags |= CLUSTER_NODE_SLAVE;
3645 clusterCloseAllSlots();
3646 } else {
3647 if (myself->slaveof)
3648 clusterNodeRemoveSlave(myself->slaveof,myself);
3649 }
3650 myself->slaveof = n;
3651 clusterNodeAddSlave(n,myself);
3652 replicationSetMaster(n->ip, n->port);
3653 resetManualFailover();
3654 }
3655
3656 /* -----------------------------------------------------------------------------
3657 * Nodes to string representation functions.
3658 * -------------------------------------------------------------------------- */
3659
3660 struct redisNodeFlags {
3661 uint16_t flag;
3662 char *name;
3663 };
3664
3665 static struct redisNodeFlags redisNodeFlagsTable[] = {
3666 {CLUSTER_NODE_MYSELF, "myself,"},
3667 {CLUSTER_NODE_MASTER, "master,"},
3668 {CLUSTER_NODE_SLAVE, "slave,"},
3669 {CLUSTER_NODE_PFAIL, "fail?,"},
3670 {CLUSTER_NODE_FAIL, "fail,"},
3671 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
3672 {CLUSTER_NODE_NOADDR, "noaddr,"}
3673 };
3674
3675 /* Concatenate the comma separated list of node flags to the given SDS
3676 * string 'ci'. */
representClusterNodeFlags(sds ci,uint16_t flags)3677 sds representClusterNodeFlags(sds ci, uint16_t flags) {
3678 if (flags == 0) {
3679 ci = sdscat(ci,"noflags,");
3680 } else {
3681 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
3682 for (i = 0; i < size; i++) {
3683 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
3684 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
3685 }
3686 }
3687 sdsIncrLen(ci,-1); /* Remove trailing comma. */
3688 return ci;
3689 }
3690
3691 /* Generate a csv-alike representation of the specified cluster node.
3692 * See clusterGenNodesDescription() top comment for more information.
3693 *
3694 * The function returns the string representation as an SDS string. */
clusterGenNodeDescription(clusterNode * node)3695 sds clusterGenNodeDescription(clusterNode *node) {
3696 int j, start;
3697 sds ci;
3698
3699 /* Node coordinates */
3700 ci = sdscatprintf(sdsempty(),"%.40s %s:%d ",
3701 node->name,
3702 node->ip,
3703 node->port);
3704
3705 /* Flags */
3706 ci = representClusterNodeFlags(ci, node->flags);
3707
3708 /* Slave of... or just "-" */
3709 if (node->slaveof)
3710 ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
3711 else
3712 ci = sdscatlen(ci," - ",3);
3713
3714 /* Latency from the POV of this node, config epoch, link status */
3715 ci = sdscatprintf(ci,"%lld %lld %llu %s",
3716 (long long) node->ping_sent,
3717 (long long) node->pong_received,
3718 (unsigned long long) node->configEpoch,
3719 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
3720 "connected" : "disconnected");
3721
3722 /* Slots served by this instance */
3723 start = -1;
3724 for (j = 0; j < CLUSTER_SLOTS; j++) {
3725 int bit;
3726
3727 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
3728 if (start == -1) start = j;
3729 }
3730 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
3731 if (bit && j == CLUSTER_SLOTS-1) j++;
3732
3733 if (start == j-1) {
3734 ci = sdscatprintf(ci," %d",start);
3735 } else {
3736 ci = sdscatprintf(ci," %d-%d",start,j-1);
3737 }
3738 start = -1;
3739 }
3740 }
3741
3742 /* Just for MYSELF node we also dump info about slots that
3743 * we are migrating to other instances or importing from other
3744 * instances. */
3745 if (node->flags & CLUSTER_NODE_MYSELF) {
3746 for (j = 0; j < CLUSTER_SLOTS; j++) {
3747 if (server.cluster->migrating_slots_to[j]) {
3748 ci = sdscatprintf(ci," [%d->-%.40s]",j,
3749 server.cluster->migrating_slots_to[j]->name);
3750 } else if (server.cluster->importing_slots_from[j]) {
3751 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
3752 server.cluster->importing_slots_from[j]->name);
3753 }
3754 }
3755 }
3756 return ci;
3757 }
3758
3759 /* Generate a csv-alike representation of the nodes we are aware of,
3760 * including the "myself" node, and return an SDS string containing the
3761 * representation (it is up to the caller to free it).
3762 *
3763 * All the nodes matching at least one of the node flags specified in
3764 * "filter" are excluded from the output, so using zero as a filter will
3765 * include all the known nodes in the representation, including nodes in
3766 * the HANDSHAKE state.
3767 *
3768 * The representation obtained using this function is used for the output
3769 * of the CLUSTER NODES function, and as format for the cluster
3770 * configuration file (nodes.conf) for a given node. */
clusterGenNodesDescription(int filter)3771 sds clusterGenNodesDescription(int filter) {
3772 sds ci = sdsempty(), ni;
3773 dictIterator *di;
3774 dictEntry *de;
3775
3776 di = dictGetSafeIterator(server.cluster->nodes);
3777 while((de = dictNext(di)) != NULL) {
3778 clusterNode *node = dictGetVal(de);
3779
3780 if (node->flags & filter) continue;
3781 ni = clusterGenNodeDescription(node);
3782 ci = sdscatsds(ci,ni);
3783 sdsfree(ni);
3784 ci = sdscatlen(ci,"\n",1);
3785 }
3786 dictReleaseIterator(di);
3787 return ci;
3788 }
3789
3790 /* -----------------------------------------------------------------------------
3791 * CLUSTER command
3792 * -------------------------------------------------------------------------- */
3793
getSlotOrReply(client * c,robj * o)3794 int getSlotOrReply(client *c, robj *o) {
3795 long long slot;
3796
3797 if (getLongLongFromObject(o,&slot) != C_OK ||
3798 slot < 0 || slot >= CLUSTER_SLOTS)
3799 {
3800 addReplyError(c,"Invalid or out of range slot");
3801 return -1;
3802 }
3803 return (int) slot;
3804 }
3805
clusterReplyMultiBulkSlots(client * c)3806 void clusterReplyMultiBulkSlots(client *c) {
3807 /* Format: 1) 1) start slot
3808 * 2) end slot
3809 * 3) 1) master IP
3810 * 2) master port
3811 * 3) node ID
3812 * 4) 1) replica IP
3813 * 2) replica port
3814 * 3) node ID
3815 * ... continued until done
3816 */
3817
3818 int num_masters = 0;
3819 void *slot_replylen = addDeferredMultiBulkLength(c);
3820
3821 dictEntry *de;
3822 dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
3823 while((de = dictNext(di)) != NULL) {
3824 clusterNode *node = dictGetVal(de);
3825 int j = 0, start = -1;
3826
3827 /* Skip slaves (that are iterated when producing the output of their
3828 * master) and masters not serving any slot. */
3829 if (!nodeIsMaster(node) || node->numslots == 0) continue;
3830
3831 for (j = 0; j < CLUSTER_SLOTS; j++) {
3832 int bit, i;
3833
3834 if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
3835 if (start == -1) start = j;
3836 }
3837 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
3838 int nested_elements = 3; /* slots (2) + master addr (1). */
3839 void *nested_replylen = addDeferredMultiBulkLength(c);
3840
3841 if (bit && j == CLUSTER_SLOTS-1) j++;
3842
3843 /* If slot exists in output map, add to it's list.
3844 * else, create a new output map for this slot */
3845 if (start == j-1) {
3846 addReplyLongLong(c, start); /* only one slot; low==high */
3847 addReplyLongLong(c, start);
3848 } else {
3849 addReplyLongLong(c, start); /* low */
3850 addReplyLongLong(c, j-1); /* high */
3851 }
3852 start = -1;
3853
3854 /* First node reply position is always the master */
3855 addReplyMultiBulkLen(c, 3);
3856 addReplyBulkCString(c, node->ip);
3857 addReplyLongLong(c, node->port);
3858 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
3859
3860 /* Remaining nodes in reply are replicas for slot range */
3861 for (i = 0; i < node->numslaves; i++) {
3862 /* This loop is copy/pasted from clusterGenNodeDescription()
3863 * with modifications for per-slot node aggregation */
3864 if (nodeFailed(node->slaves[i])) continue;
3865 addReplyMultiBulkLen(c, 3);
3866 addReplyBulkCString(c, node->slaves[i]->ip);
3867 addReplyLongLong(c, node->slaves[i]->port);
3868 addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
3869 nested_elements++;
3870 }
3871 setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
3872 num_masters++;
3873 }
3874 }
3875 }
3876 dictReleaseIterator(di);
3877 setDeferredMultiBulkLength(c, slot_replylen, num_masters);
3878 }
3879
clusterCommand(client * c)3880 void clusterCommand(client *c) {
3881 if (server.cluster_enabled == 0) {
3882 addReplyError(c,"This instance has cluster support disabled");
3883 return;
3884 }
3885
3886 if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
3887 long long port;
3888
3889 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
3890 addReplyErrorFormat(c,"Invalid TCP port specified: %s",
3891 (char*)c->argv[3]->ptr);
3892 return;
3893 }
3894
3895 if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
3896 errno == EINVAL)
3897 {
3898 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
3899 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
3900 } else {
3901 addReply(c,shared.ok);
3902 }
3903 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
3904 /* CLUSTER NODES */
3905 robj *o;
3906 sds ci = clusterGenNodesDescription(0);
3907
3908 o = createObject(OBJ_STRING,ci);
3909 addReplyBulk(c,o);
3910 decrRefCount(o);
3911 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
3912 /* CLUSTER MYID */
3913 addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
3914 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
3915 /* CLUSTER SLOTS */
3916 clusterReplyMultiBulkSlots(c);
3917 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
3918 /* CLUSTER FLUSHSLOTS */
3919 if (dictSize(server.db[0].dict) != 0) {
3920 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
3921 return;
3922 }
3923 clusterDelNodeSlots(myself);
3924 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
3925 addReply(c,shared.ok);
3926 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
3927 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
3928 {
3929 /* CLUSTER ADDSLOTS <slot> [slot] ... */
3930 /* CLUSTER DELSLOTS <slot> [slot] ... */
3931 int j, slot;
3932 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
3933 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
3934
3935 memset(slots,0,CLUSTER_SLOTS);
3936 /* Check that all the arguments are parseable and that all the
3937 * slots are not already busy. */
3938 for (j = 2; j < c->argc; j++) {
3939 if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
3940 zfree(slots);
3941 return;
3942 }
3943 if (del && server.cluster->slots[slot] == NULL) {
3944 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
3945 zfree(slots);
3946 return;
3947 } else if (!del && server.cluster->slots[slot]) {
3948 addReplyErrorFormat(c,"Slot %d is already busy", slot);
3949 zfree(slots);
3950 return;
3951 }
3952 if (slots[slot]++ == 1) {
3953 addReplyErrorFormat(c,"Slot %d specified multiple times",
3954 (int)slot);
3955 zfree(slots);
3956 return;
3957 }
3958 }
3959 for (j = 0; j < CLUSTER_SLOTS; j++) {
3960 if (slots[j]) {
3961 int retval;
3962
3963 /* If this slot was set as importing we can clear this
3964 * state as now we are the real owner of the slot. */
3965 if (server.cluster->importing_slots_from[j])
3966 server.cluster->importing_slots_from[j] = NULL;
3967
3968 retval = del ? clusterDelSlot(j) :
3969 clusterAddSlot(myself,j);
3970 serverAssertWithInfo(c,NULL,retval == C_OK);
3971 }
3972 }
3973 zfree(slots);
3974 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
3975 addReply(c,shared.ok);
3976 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
3977 /* SETSLOT 10 MIGRATING <node ID> */
3978 /* SETSLOT 10 IMPORTING <node ID> */
3979 /* SETSLOT 10 STABLE */
3980 /* SETSLOT 10 NODE <node ID> */
3981 int slot;
3982 clusterNode *n;
3983
3984 if (nodeIsSlave(myself)) {
3985 addReplyError(c,"Please use SETSLOT only with masters.");
3986 return;
3987 }
3988
3989 if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
3990
3991 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
3992 if (server.cluster->slots[slot] != myself) {
3993 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
3994 return;
3995 }
3996 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
3997 addReplyErrorFormat(c,"I don't know about node %s",
3998 (char*)c->argv[4]->ptr);
3999 return;
4000 }
4001 server.cluster->migrating_slots_to[slot] = n;
4002 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
4003 if (server.cluster->slots[slot] == myself) {
4004 addReplyErrorFormat(c,
4005 "I'm already the owner of hash slot %u",slot);
4006 return;
4007 }
4008 if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
4009 addReplyErrorFormat(c,"I don't know about node %s",
4010 (char*)c->argv[3]->ptr);
4011 return;
4012 }
4013 server.cluster->importing_slots_from[slot] = n;
4014 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
4015 /* CLUSTER SETSLOT <SLOT> STABLE */
4016 server.cluster->importing_slots_from[slot] = NULL;
4017 server.cluster->migrating_slots_to[slot] = NULL;
4018 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
4019 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
4020 clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
4021
4022 if (!n) {
4023 addReplyErrorFormat(c,"Unknown node %s",
4024 (char*)c->argv[4]->ptr);
4025 return;
4026 }
4027 /* If this hash slot was served by 'myself' before to switch
4028 * make sure there are no longer local keys for this hash slot. */
4029 if (server.cluster->slots[slot] == myself && n != myself) {
4030 if (countKeysInSlot(slot) != 0) {
4031 addReplyErrorFormat(c,
4032 "Can't assign hashslot %d to a different node "
4033 "while I still hold keys for this hash slot.", slot);
4034 return;
4035 }
4036 }
4037 /* If this slot is in migrating status but we have no keys
4038 * for it assigning the slot to another node will clear
4039 * the migratig status. */
4040 if (countKeysInSlot(slot) == 0 &&
4041 server.cluster->migrating_slots_to[slot])
4042 server.cluster->migrating_slots_to[slot] = NULL;
4043
4044 /* If this node was importing this slot, assigning the slot to
4045 * itself also clears the importing status. */
4046 if (n == myself &&
4047 server.cluster->importing_slots_from[slot])
4048 {
4049 /* This slot was manually migrated, set this node configEpoch
4050 * to a new epoch so that the new version can be propagated
4051 * by the cluster.
4052 *
4053 * Note that if this ever results in a collision with another
4054 * node getting the same configEpoch, for example because a
4055 * failover happens at the same time we close the slot, the
4056 * configEpoch collision resolution will fix it assigning
4057 * a different epoch to each node. */
4058 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
4059 serverLog(LL_WARNING,
4060 "configEpoch updated after importing slot %d", slot);
4061 }
4062 server.cluster->importing_slots_from[slot] = NULL;
4063 }
4064 clusterDelSlot(slot);
4065 clusterAddSlot(n,slot);
4066 } else {
4067 addReplyError(c,
4068 "Invalid CLUSTER SETSLOT action or number of arguments");
4069 return;
4070 }
4071 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
4072 addReply(c,shared.ok);
4073 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
4074 /* CLUSTER BUMPEPOCH */
4075 int retval = clusterBumpConfigEpochWithoutConsensus();
4076 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
4077 (retval == C_OK) ? "BUMPED" : "STILL",
4078 (unsigned long long) myself->configEpoch);
4079 addReplySds(c,reply);
4080 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
4081 /* CLUSTER INFO */
4082 char *statestr[] = {"ok","fail","needhelp"};
4083 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
4084 uint64_t myepoch;
4085 int j;
4086
4087 for (j = 0; j < CLUSTER_SLOTS; j++) {
4088 clusterNode *n = server.cluster->slots[j];
4089
4090 if (n == NULL) continue;
4091 slots_assigned++;
4092 if (nodeFailed(n)) {
4093 slots_fail++;
4094 } else if (nodeTimedOut(n)) {
4095 slots_pfail++;
4096 } else {
4097 slots_ok++;
4098 }
4099 }
4100
4101 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
4102 myself->slaveof->configEpoch : myself->configEpoch;
4103
4104 sds info = sdscatprintf(sdsempty(),
4105 "cluster_state:%s\r\n"
4106 "cluster_slots_assigned:%d\r\n"
4107 "cluster_slots_ok:%d\r\n"
4108 "cluster_slots_pfail:%d\r\n"
4109 "cluster_slots_fail:%d\r\n"
4110 "cluster_known_nodes:%lu\r\n"
4111 "cluster_size:%d\r\n"
4112 "cluster_current_epoch:%llu\r\n"
4113 "cluster_my_epoch:%llu\r\n"
4114 "cluster_stats_messages_sent:%lld\r\n"
4115 "cluster_stats_messages_received:%lld\r\n"
4116 , statestr[server.cluster->state],
4117 slots_assigned,
4118 slots_ok,
4119 slots_pfail,
4120 slots_fail,
4121 dictSize(server.cluster->nodes),
4122 server.cluster->size,
4123 (unsigned long long) server.cluster->currentEpoch,
4124 (unsigned long long) myepoch,
4125 server.cluster->stats_bus_messages_sent,
4126 server.cluster->stats_bus_messages_received
4127 );
4128 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
4129 (unsigned long)sdslen(info)));
4130 addReplySds(c,info);
4131 addReply(c,shared.crlf);
4132 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
4133 int retval = clusterSaveConfig(1);
4134
4135 if (retval == 0)
4136 addReply(c,shared.ok);
4137 else
4138 addReplyErrorFormat(c,"error saving the cluster node config: %s",
4139 strerror(errno));
4140 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
4141 /* CLUSTER KEYSLOT <key> */
4142 sds key = c->argv[2]->ptr;
4143
4144 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
4145 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
4146 /* CLUSTER COUNTKEYSINSLOT <slot> */
4147 long long slot;
4148
4149 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4150 return;
4151 if (slot < 0 || slot >= CLUSTER_SLOTS) {
4152 addReplyError(c,"Invalid slot");
4153 return;
4154 }
4155 addReplyLongLong(c,countKeysInSlot(slot));
4156 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
4157 /* CLUSTER GETKEYSINSLOT <slot> <count> */
4158 long long maxkeys, slot;
4159 unsigned int numkeys, j;
4160 robj **keys;
4161
4162 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
4163 return;
4164 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
4165 != C_OK)
4166 return;
4167 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
4168 addReplyError(c,"Invalid slot or number of keys");
4169 return;
4170 }
4171
4172 keys = zmalloc(sizeof(robj*)*maxkeys);
4173 numkeys = getKeysInSlot(slot, keys, maxkeys);
4174 addReplyMultiBulkLen(c,numkeys);
4175 for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
4176 zfree(keys);
4177 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
4178 /* CLUSTER FORGET <NODE ID> */
4179 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4180
4181 if (!n) {
4182 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4183 return;
4184 } else if (n == myself) {
4185 addReplyError(c,"I tried hard but I can't forget myself...");
4186 return;
4187 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
4188 addReplyError(c,"Can't forget my master!");
4189 return;
4190 }
4191 clusterBlacklistAddNode(n);
4192 clusterDelNode(n);
4193 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4194 CLUSTER_TODO_SAVE_CONFIG);
4195 addReply(c,shared.ok);
4196 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
4197 /* CLUSTER REPLICATE <NODE ID> */
4198 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4199
4200 /* Lookup the specified node in our table. */
4201 if (!n) {
4202 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4203 return;
4204 }
4205
4206 /* I can't replicate myself. */
4207 if (n == myself) {
4208 addReplyError(c,"Can't replicate myself");
4209 return;
4210 }
4211
4212 /* Can't replicate a slave. */
4213 if (nodeIsSlave(n)) {
4214 addReplyError(c,"I can only replicate a master, not a slave.");
4215 return;
4216 }
4217
4218 /* If the instance is currently a master, it should have no assigned
4219 * slots nor keys to accept to replicate some other node.
4220 * Slaves can switch to another master without issues. */
4221 if (nodeIsMaster(myself) &&
4222 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
4223 addReplyError(c,
4224 "To set a master the node must be empty and "
4225 "without assigned slots.");
4226 return;
4227 }
4228
4229 /* Set the master. */
4230 clusterSetMaster(n);
4231 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
4232 addReply(c,shared.ok);
4233 } else if (!strcasecmp(c->argv[1]->ptr,"slaves") && c->argc == 3) {
4234 /* CLUSTER SLAVES <NODE ID> */
4235 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4236 int j;
4237
4238 /* Lookup the specified node in our table. */
4239 if (!n) {
4240 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4241 return;
4242 }
4243
4244 if (nodeIsSlave(n)) {
4245 addReplyError(c,"The specified node is not a master");
4246 return;
4247 }
4248
4249 addReplyMultiBulkLen(c,n->numslaves);
4250 for (j = 0; j < n->numslaves; j++) {
4251 sds ni = clusterGenNodeDescription(n->slaves[j]);
4252 addReplyBulkCString(c,ni);
4253 sdsfree(ni);
4254 }
4255 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
4256 c->argc == 3)
4257 {
4258 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
4259 clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
4260
4261 if (!n) {
4262 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
4263 return;
4264 } else {
4265 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
4266 }
4267 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
4268 (c->argc == 2 || c->argc == 3))
4269 {
4270 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
4271 int force = 0, takeover = 0;
4272
4273 if (c->argc == 3) {
4274 if (!strcasecmp(c->argv[2]->ptr,"force")) {
4275 force = 1;
4276 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
4277 takeover = 1;
4278 force = 1; /* Takeover also implies force. */
4279 } else {
4280 addReply(c,shared.syntaxerr);
4281 return;
4282 }
4283 }
4284
4285 /* Check preconditions. */
4286 if (nodeIsMaster(myself)) {
4287 addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
4288 return;
4289 } else if (myself->slaveof == NULL) {
4290 addReplyError(c,"I'm a slave but my master is unknown to me");
4291 return;
4292 } else if (!force &&
4293 (nodeFailed(myself->slaveof) ||
4294 myself->slaveof->link == NULL))
4295 {
4296 addReplyError(c,"Master is down or failed, "
4297 "please use CLUSTER FAILOVER FORCE");
4298 return;
4299 }
4300 resetManualFailover();
4301 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
4302
4303 if (takeover) {
4304 /* A takeover does not perform any initial check. It just
4305 * generates a new configuration epoch for this node without
4306 * consensus, claims the master's slots, and broadcast the new
4307 * configuration. */
4308 serverLog(LL_WARNING,"Taking over the master (user request).");
4309 clusterBumpConfigEpochWithoutConsensus();
4310 clusterFailoverReplaceYourMaster();
4311 } else if (force) {
4312 /* If this is a forced failover, we don't need to talk with our
4313 * master to agree about the offset. We just failover taking over
4314 * it without coordination. */
4315 serverLog(LL_WARNING,"Forced failover user request accepted.");
4316 server.cluster->mf_can_start = 1;
4317 } else {
4318 serverLog(LL_WARNING,"Manual failover user request accepted.");
4319 clusterSendMFStart(myself->slaveof);
4320 }
4321 addReply(c,shared.ok);
4322 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
4323 {
4324 /* CLUSTER SET-CONFIG-EPOCH <epoch>
4325 *
4326 * The user is allowed to set the config epoch only when a node is
4327 * totally fresh: no config epoch, no other known node, and so forth.
4328 * This happens at cluster creation time to start with a cluster where
4329 * every node has a different node ID, without to rely on the conflicts
4330 * resolution system which is too slow when a big cluster is created. */
4331 long long epoch;
4332
4333 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
4334 return;
4335
4336 if (epoch < 0) {
4337 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
4338 } else if (dictSize(server.cluster->nodes) > 1) {
4339 addReplyError(c,"The user can assign a config epoch only when the "
4340 "node does not know any other node.");
4341 } else if (myself->configEpoch != 0) {
4342 addReplyError(c,"Node config epoch is already non-zero");
4343 } else {
4344 myself->configEpoch = epoch;
4345 serverLog(LL_WARNING,
4346 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
4347 (unsigned long long) myself->configEpoch);
4348
4349 if (server.cluster->currentEpoch < (uint64_t)epoch)
4350 server.cluster->currentEpoch = epoch;
4351 /* No need to fsync the config here since in the unlucky event
4352 * of a failure to persist the config, the conflict resolution code
4353 * will assign an unique config to this node. */
4354 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
4355 CLUSTER_TODO_SAVE_CONFIG);
4356 addReply(c,shared.ok);
4357 }
4358 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
4359 (c->argc == 2 || c->argc == 3))
4360 {
4361 /* CLUSTER RESET [SOFT|HARD] */
4362 int hard = 0;
4363
4364 /* Parse soft/hard argument. Default is soft. */
4365 if (c->argc == 3) {
4366 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
4367 hard = 1;
4368 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
4369 hard = 0;
4370 } else {
4371 addReply(c,shared.syntaxerr);
4372 return;
4373 }
4374 }
4375
4376 /* Slaves can be reset while containing data, but not master nodes
4377 * that must be empty. */
4378 if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
4379 addReplyError(c,"CLUSTER RESET can't be called with "
4380 "master nodes containing keys");
4381 return;
4382 }
4383 clusterReset(hard);
4384 addReply(c,shared.ok);
4385 } else {
4386 addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
4387 }
4388 }
4389
4390 /* -----------------------------------------------------------------------------
4391 * DUMP, RESTORE and MIGRATE commands
4392 * -------------------------------------------------------------------------- */
4393
4394 /* Generates a DUMP-format representation of the object 'o', adding it to the
4395 * io stream pointed by 'rio'. This function can't fail. */
createDumpPayload(rio * payload,robj * o)4396 void createDumpPayload(rio *payload, robj *o) {
4397 unsigned char buf[2];
4398 uint64_t crc;
4399
4400 /* Serialize the object in a RDB-like format. It consist of an object type
4401 * byte followed by the serialized object. This is understood by RESTORE. */
4402 rioInitWithBuffer(payload,sdsempty());
4403 serverAssert(rdbSaveObjectType(payload,o));
4404 serverAssert(rdbSaveObject(payload,o));
4405
4406 /* Write the footer, this is how it looks like:
4407 * ----------------+---------------------+---------------+
4408 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
4409 * ----------------+---------------------+---------------+
4410 * RDB version and CRC are both in little endian.
4411 */
4412
4413 /* RDB version */
4414 buf[0] = RDB_VERSION & 0xff;
4415 buf[1] = (RDB_VERSION >> 8) & 0xff;
4416 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
4417
4418 /* CRC64 */
4419 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
4420 sdslen(payload->io.buffer.ptr));
4421 memrev64ifbe(&crc);
4422 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
4423 }
4424
4425 /* Verify that the RDB version of the dump payload matches the one of this Redis
4426 * instance and that the checksum is ok.
4427 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
4428 * is returned. */
verifyDumpPayload(unsigned char * p,size_t len)4429 int verifyDumpPayload(unsigned char *p, size_t len) {
4430 unsigned char *footer;
4431 uint16_t rdbver;
4432 uint64_t crc;
4433
4434 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
4435 if (len < 10) return C_ERR;
4436 footer = p+(len-10);
4437
4438 /* Verify RDB version */
4439 rdbver = (footer[1] << 8) | footer[0];
4440 if (rdbver > RDB_VERSION) return C_ERR;
4441
4442 /* Verify CRC64 */
4443 crc = crc64(0,p,len-8);
4444 memrev64ifbe(&crc);
4445 return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
4446 }
4447
4448 /* DUMP keyname
4449 * DUMP is actually not used by Redis Cluster but it is the obvious
4450 * complement of RESTORE and can be useful for different applications. */
dumpCommand(client * c)4451 void dumpCommand(client *c) {
4452 robj *o, *dumpobj;
4453 rio payload;
4454
4455 /* Check if the key is here. */
4456 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
4457 addReply(c,shared.nullbulk);
4458 return;
4459 }
4460
4461 /* Create the DUMP encoded representation. */
4462 createDumpPayload(&payload,o);
4463
4464 /* Transfer to the client */
4465 dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
4466 addReplyBulk(c,dumpobj);
4467 decrRefCount(dumpobj);
4468 return;
4469 }
4470
4471 /* RESTORE key ttl serialized-value [REPLACE] */
restoreCommand(client * c)4472 void restoreCommand(client *c) {
4473 long long ttl;
4474 rio payload;
4475 int j, type, replace = 0;
4476 robj *obj;
4477
4478 /* Parse additional options */
4479 for (j = 4; j < c->argc; j++) {
4480 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4481 replace = 1;
4482 } else {
4483 addReply(c,shared.syntaxerr);
4484 return;
4485 }
4486 }
4487
4488 /* Make sure this key does not already exist here... */
4489 if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
4490 addReply(c,shared.busykeyerr);
4491 return;
4492 }
4493
4494 /* Check if the TTL value makes sense */
4495 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
4496 return;
4497 } else if (ttl < 0) {
4498 addReplyError(c,"Invalid TTL value, must be >= 0");
4499 return;
4500 }
4501
4502 /* Verify RDB version and data checksum. */
4503 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
4504 {
4505 addReplyError(c,"DUMP payload version or checksum are wrong");
4506 return;
4507 }
4508
4509 rioInitWithBuffer(&payload,c->argv[3]->ptr);
4510 if (((type = rdbLoadObjectType(&payload)) == -1) ||
4511 ((obj = rdbLoadObject(type,&payload)) == NULL))
4512 {
4513 addReplyError(c,"Bad data format");
4514 return;
4515 }
4516
4517 /* Remove the old key if needed. */
4518 if (replace) dbDelete(c->db,c->argv[1]);
4519
4520 /* Create the key and set the TTL if any */
4521 dbAdd(c->db,c->argv[1],obj);
4522 if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
4523 signalModifiedKey(c->db,c->argv[1]);
4524 addReply(c,shared.ok);
4525 server.dirty++;
4526 }
4527
4528 /* MIGRATE socket cache implementation.
4529 *
4530 * We take a map between host:ip and a TCP socket that we used to connect
4531 * to this instance in recent time.
4532 * This sockets are closed when the max number we cache is reached, and also
4533 * in serverCron() when they are around for more than a few seconds. */
4534 #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
4535 #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
4536
4537 typedef struct migrateCachedSocket {
4538 int fd;
4539 long last_dbid;
4540 time_t last_use_time;
4541 } migrateCachedSocket;
4542
4543 /* Return a migrateCachedSocket containing a TCP socket connected with the
4544 * target instance, possibly returning a cached one.
4545 *
4546 * This function is responsible of sending errors to the client if a
4547 * connection can't be established. In this case -1 is returned.
4548 * Otherwise on success the socket is returned, and the caller should not
4549 * attempt to free it after usage.
4550 *
4551 * If the caller detects an error while using the socket, migrateCloseSocket()
4552 * should be called so that the connection will be created from scratch
4553 * the next time. */
migrateGetSocket(client * c,robj * host,robj * port,long timeout)4554 migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
4555 int fd;
4556 sds name = sdsempty();
4557 migrateCachedSocket *cs;
4558
4559 /* Check if we have an already cached socket for this ip:port pair. */
4560 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4561 name = sdscatlen(name,":",1);
4562 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4563 cs = dictFetchValue(server.migrate_cached_sockets,name);
4564 if (cs) {
4565 sdsfree(name);
4566 cs->last_use_time = server.unixtime;
4567 return cs;
4568 }
4569
4570 /* No cached socket, create one. */
4571 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
4572 /* Too many items, drop one at random. */
4573 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
4574 cs = dictGetVal(de);
4575 close(cs->fd);
4576 zfree(cs);
4577 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4578 }
4579
4580 /* Create the socket */
4581 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
4582 atoi(c->argv[2]->ptr));
4583 if (fd == -1) {
4584 sdsfree(name);
4585 addReplyErrorFormat(c,"Can't connect to target node: %s",
4586 server.neterr);
4587 return NULL;
4588 }
4589 anetEnableTcpNoDelay(server.neterr,fd);
4590
4591 /* Check if it connects within the specified timeout. */
4592 if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
4593 sdsfree(name);
4594 addReplySds(c,
4595 sdsnew("-IOERR error or timeout connecting to the client\r\n"));
4596 close(fd);
4597 return NULL;
4598 }
4599
4600 /* Add to the cache and return it to the caller. */
4601 cs = zmalloc(sizeof(*cs));
4602 cs->fd = fd;
4603 cs->last_dbid = -1;
4604 cs->last_use_time = server.unixtime;
4605 dictAdd(server.migrate_cached_sockets,name,cs);
4606 return cs;
4607 }
4608
4609 /* Free a migrate cached connection. */
migrateCloseSocket(robj * host,robj * port)4610 void migrateCloseSocket(robj *host, robj *port) {
4611 sds name = sdsempty();
4612 migrateCachedSocket *cs;
4613
4614 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
4615 name = sdscatlen(name,":",1);
4616 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
4617 cs = dictFetchValue(server.migrate_cached_sockets,name);
4618 if (!cs) {
4619 sdsfree(name);
4620 return;
4621 }
4622
4623 close(cs->fd);
4624 zfree(cs);
4625 dictDelete(server.migrate_cached_sockets,name);
4626 sdsfree(name);
4627 }
4628
migrateCloseTimedoutSockets(void)4629 void migrateCloseTimedoutSockets(void) {
4630 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
4631 dictEntry *de;
4632
4633 while((de = dictNext(di)) != NULL) {
4634 migrateCachedSocket *cs = dictGetVal(de);
4635
4636 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
4637 close(cs->fd);
4638 zfree(cs);
4639 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
4640 }
4641 }
4642 dictReleaseIterator(di);
4643 }
4644
4645 /* MIGRATE host port key dbid timeout [COPY | REPLACE]
4646 *
4647 * On in the multiple keys form:
4648 *
4649 * MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN */
migrateCommand(client * c)4650 void migrateCommand(client *c) {
4651 migrateCachedSocket *cs;
4652 int copy, replace, j;
4653 long timeout;
4654 long dbid;
4655 long long ttl, expireat;
4656 robj **ov = NULL; /* Objects to migrate. */
4657 robj **kv = NULL; /* Key names. */
4658 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
4659 rio cmd, payload;
4660 int may_retry = 1;
4661 int write_error = 0;
4662
4663 /* To support the KEYS option we need the following additional state. */
4664 int first_key = 3; /* Argument index of the first key. */
4665 int num_keys = 1; /* By default only migrate the 'key' argument. */
4666
4667 /* Initialization */
4668 copy = 0;
4669 replace = 0;
4670 ttl = 0;
4671
4672 /* Parse additional options */
4673 for (j = 6; j < c->argc; j++) {
4674 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
4675 copy = 1;
4676 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
4677 replace = 1;
4678 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
4679 if (sdslen(c->argv[3]->ptr) != 0) {
4680 addReplyError(c,
4681 "When using MIGRATE KEYS option, the key argument"
4682 " must be set to the empty string");
4683 return;
4684 }
4685 first_key = j+1;
4686 num_keys = c->argc - j - 1;
4687 break; /* All the remaining args are keys. */
4688 } else {
4689 addReply(c,shared.syntaxerr);
4690 return;
4691 }
4692 }
4693
4694 /* Sanity check */
4695 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
4696 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
4697 {
4698 return;
4699 }
4700 if (timeout <= 0) timeout = 1000;
4701
4702 /* Check if the keys are here. If at least one key is to migrate, do it
4703 * otherwise if all the keys are missing reply with "NOKEY" to signal
4704 * the caller there was nothing to migrate. We don't return an error in
4705 * this case, since often this is due to a normal condition like the key
4706 * expiring in the meantime. */
4707 ov = zrealloc(ov,sizeof(robj*)*num_keys);
4708 kv = zrealloc(kv,sizeof(robj*)*num_keys);
4709 int oi = 0;
4710
4711 for (j = 0; j < num_keys; j++) {
4712 if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
4713 kv[oi] = c->argv[first_key+j];
4714 oi++;
4715 }
4716 }
4717 num_keys = oi;
4718 if (num_keys == 0) {
4719 zfree(ov); zfree(kv);
4720 addReplySds(c,sdsnew("+NOKEY\r\n"));
4721 return;
4722 }
4723
4724 try_again:
4725 write_error = 0;
4726
4727 /* Connect */
4728 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
4729 if (cs == NULL) {
4730 zfree(ov); zfree(kv);
4731 return; /* error sent to the client by migrateGetSocket() */
4732 }
4733
4734 rioInitWithBuffer(&cmd,sdsempty());
4735
4736 /* Send the SELECT command if the current DB is not already selected. */
4737 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
4738 if (select) {
4739 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
4740 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
4741 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
4742 }
4743
4744 /* Create RESTORE payload and generate the protocol to call the command. */
4745 for (j = 0; j < num_keys; j++) {
4746 expireat = getExpire(c->db,kv[j]);
4747 if (expireat != -1) {
4748 ttl = expireat-mstime();
4749 if (ttl < 1) ttl = 1;
4750 }
4751 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
4752 if (server.cluster_enabled)
4753 serverAssertWithInfo(c,NULL,
4754 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
4755 else
4756 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
4757 serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
4758 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
4759 sdslen(kv[j]->ptr)));
4760 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
4761
4762 /* Emit the payload argument, that is the serialized object using
4763 * the DUMP format. */
4764 createDumpPayload(&payload,ov[j]);
4765 serverAssertWithInfo(c,NULL,
4766 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
4767 sdslen(payload.io.buffer.ptr)));
4768 sdsfree(payload.io.buffer.ptr);
4769
4770 /* Add the REPLACE option to the RESTORE command if it was specified
4771 * as a MIGRATE option. */
4772 if (replace)
4773 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
4774 }
4775
4776 /* Transfer the query to the other node in 64K chunks. */
4777 errno = 0;
4778 {
4779 sds buf = cmd.io.buffer.ptr;
4780 size_t pos = 0, towrite;
4781 int nwritten = 0;
4782
4783 while ((towrite = sdslen(buf)-pos) > 0) {
4784 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
4785 nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
4786 if (nwritten != (signed)towrite) {
4787 write_error = 1;
4788 goto socket_err;
4789 }
4790 pos += nwritten;
4791 }
4792 }
4793
4794 char buf1[1024]; /* Select reply. */
4795 char buf2[1024]; /* Restore reply. */
4796
4797 /* Read the SELECT reply if needed. */
4798 if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
4799 goto socket_err;
4800
4801 /* Read the RESTORE replies. */
4802 int error_from_target = 0;
4803 int socket_error = 0;
4804 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
4805
4806 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
4807
4808 for (j = 0; j < num_keys; j++) {
4809 if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
4810 socket_error = 1;
4811 break;
4812 }
4813 if ((select && buf1[0] == '-') || buf2[0] == '-') {
4814 /* On error assume that last_dbid is no longer valid. */
4815 if (!error_from_target) {
4816 cs->last_dbid = -1;
4817 addReplyErrorFormat(c,"Target instance replied with error: %s",
4818 (select && buf1[0] == '-') ? buf1+1 : buf2+1);
4819 error_from_target = 1;
4820 }
4821 } else {
4822 if (!copy) {
4823 /* No COPY option: remove the local key, signal the change. */
4824 dbDelete(c->db,kv[j]);
4825 signalModifiedKey(c->db,kv[j]);
4826 server.dirty++;
4827
4828 /* Populate the argument vector to replace the old one. */
4829 newargv[del_idx++] = kv[j];
4830 incrRefCount(kv[j]);
4831 }
4832 }
4833 }
4834
4835 /* On socket error, if we want to retry, do it now before rewriting the
4836 * command vector. We only retry if we are sure nothing was processed
4837 * and we failed to read the first reply (j == 0 test). */
4838 if (!error_from_target && socket_error && j == 0 && may_retry &&
4839 errno != ETIMEDOUT)
4840 {
4841 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
4842 }
4843
4844 if (!copy) {
4845 /* Translate MIGRATE as DEL for replication/AOF. */
4846 if (del_idx > 1) {
4847 newargv[0] = createStringObject("DEL",3);
4848 /* Note that the following call takes ownership of newargv. */
4849 replaceClientCommandVector(c,del_idx,newargv);
4850 } else {
4851 /* No key transfer acknowledged, no need to rewrite as DEL. */
4852 zfree(newargv);
4853 }
4854 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
4855 }
4856
4857 /* If we are here and a socket error happened, we don't want to retry.
4858 * Just signal the problem to the client, but only do it if we don't
4859 * already queued a different error reported by the destination server. */
4860 if (!error_from_target && socket_error) {
4861 may_retry = 0;
4862 goto socket_err;
4863 }
4864
4865 if (!error_from_target) {
4866 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
4867 * avoid SELECT the next time if the target DB is the same. Reply +OK. */
4868 cs->last_dbid = dbid;
4869 addReply(c,shared.ok);
4870 } else {
4871 /* On error we already sent it in the for loop above, and set
4872 * the curretly selected socket to -1 to force SELECT the next time. */
4873 }
4874
4875 sdsfree(cmd.io.buffer.ptr);
4876 zfree(ov); zfree(kv); zfree(newargv);
4877 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
4878 return;
4879
4880 /* On socket errors we try to close the cached socket and try again.
4881 * It is very common for the cached socket to get closed, if just reopening
4882 * it works it's a shame to notify the error to the caller. */
4883 socket_err:
4884 /* Cleanup we want to perform in both the retry and no retry case.
4885 * Note: Closing the migrate socket will also force SELECT next time. */
4886 sdsfree(cmd.io.buffer.ptr);
4887 migrateCloseSocket(c->argv[1],c->argv[2]);
4888 zfree(newargv);
4889 newargv = NULL; /* This will get reallocated on retry. */
4890
4891 /* Retry only if it's not a timeout and we never attempted a retry
4892 * (or the code jumping here did not set may_retry to zero). */
4893 if (errno != ETIMEDOUT && may_retry) {
4894 may_retry = 0;
4895 goto try_again;
4896 }
4897
4898 /* Cleanup we want to do if no retry is attempted. */
4899 zfree(ov); zfree(kv);
4900 addReplySds(c,
4901 sdscatprintf(sdsempty(),
4902 "-IOERR error or timeout %s to target instance\r\n",
4903 write_error ? "writing" : "reading"));
4904 return;
4905 }
4906
4907 /* -----------------------------------------------------------------------------
4908 * Cluster functions related to serving / redirecting clients
4909 * -------------------------------------------------------------------------- */
4910
4911 /* The ASKING command is required after a -ASK redirection.
4912 * The client should issue ASKING before to actually send the command to
4913 * the target instance. See the Redis Cluster specification for more
4914 * information. */
askingCommand(client * c)4915 void askingCommand(client *c) {
4916 if (server.cluster_enabled == 0) {
4917 addReplyError(c,"This instance has cluster support disabled");
4918 return;
4919 }
4920 c->flags |= CLIENT_ASKING;
4921 addReply(c,shared.ok);
4922 }
4923
4924 /* The READONLY command is used by clients to enter the read-only mode.
4925 * In this mode slaves will not redirect clients as long as clients access
4926 * with read-only commands to keys that are served by the slave's master. */
readonlyCommand(client * c)4927 void readonlyCommand(client *c) {
4928 if (server.cluster_enabled == 0) {
4929 addReplyError(c,"This instance has cluster support disabled");
4930 return;
4931 }
4932 c->flags |= CLIENT_READONLY;
4933 addReply(c,shared.ok);
4934 }
4935
4936 /* The READWRITE command just clears the READONLY command state. */
readwriteCommand(client * c)4937 void readwriteCommand(client *c) {
4938 c->flags &= ~CLIENT_READONLY;
4939 addReply(c,shared.ok);
4940 }
4941
4942 /* Return the pointer to the cluster node that is able to serve the command.
4943 * For the function to succeed the command should only target either:
4944 *
4945 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
4946 * 2) Multiple keys in the same hash slot, while the slot is stable (no
4947 * resharding in progress).
4948 *
4949 * On success the function returns the node that is able to serve the request.
4950 * If the node is not 'myself' a redirection must be perfomed. The kind of
4951 * redirection is specified setting the integer passed by reference
4952 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
4953 * CLUSTER_REDIR_MOVED.
4954 *
4955 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
4956 *
4957 * If the command fails NULL is returned, and the reason of the failure is
4958 * provided via 'error_code', which will be set to:
4959 *
4960 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
4961 * don't belong to the same hash slot.
4962 *
4963 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
4964 * belonging to the same slot, but the slot is not stable (in migration or
4965 * importing state, likely because a resharding is in progress).
4966 *
4967 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
4968 * not bound to any node. In this case the cluster global state should be
4969 * already "down" but it is fragile to rely on the update of the global state,
4970 * so we also handle it here.
4971 *
4972 * CLUSTER_REDIR_DOWN_STATE if the cluster is down but the user attempts to
4973 * execute a command that addresses one or more keys. */
getNodeByQuery(client * c,struct redisCommand * cmd,robj ** argv,int argc,int * hashslot,int * error_code)4974 clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
4975 clusterNode *n = NULL;
4976 robj *firstkey = NULL;
4977 int multiple_keys = 0;
4978 multiState *ms, _ms;
4979 multiCmd mc;
4980 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
4981
4982 /* Set error code optimistically for the base case. */
4983 if (error_code) *error_code = CLUSTER_REDIR_NONE;
4984
4985 /* We handle all the cases as if they were EXEC commands, so we have
4986 * a common code path for everything */
4987 if (cmd->proc == execCommand) {
4988 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
4989 * error. */
4990 if (!(c->flags & CLIENT_MULTI)) return myself;
4991 ms = &c->mstate;
4992 } else {
4993 /* In order to have a single codepath create a fake Multi State
4994 * structure if the client is not in MULTI/EXEC state, this way
4995 * we have a single codepath below. */
4996 ms = &_ms;
4997 _ms.commands = &mc;
4998 _ms.count = 1;
4999 mc.argv = argv;
5000 mc.argc = argc;
5001 mc.cmd = cmd;
5002 }
5003
5004 /* Check that all the keys are in the same hash slot, and obtain this
5005 * slot and the node associated. */
5006 for (i = 0; i < ms->count; i++) {
5007 struct redisCommand *mcmd;
5008 robj **margv;
5009 int margc, *keyindex, numkeys, j;
5010
5011 mcmd = ms->commands[i].cmd;
5012 margc = ms->commands[i].argc;
5013 margv = ms->commands[i].argv;
5014
5015 keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
5016 for (j = 0; j < numkeys; j++) {
5017 robj *thiskey = margv[keyindex[j]];
5018 int thisslot = keyHashSlot((char*)thiskey->ptr,
5019 sdslen(thiskey->ptr));
5020
5021 if (firstkey == NULL) {
5022 /* This is the first key we see. Check what is the slot
5023 * and node. */
5024 firstkey = thiskey;
5025 slot = thisslot;
5026 n = server.cluster->slots[slot];
5027
5028 /* Error: If a slot is not served, we are in "cluster down"
5029 * state. However the state is yet to be updated, so this was
5030 * not trapped earlier in processCommand(). Report the same
5031 * error to the client. */
5032 if (n == NULL) {
5033 getKeysFreeResult(keyindex);
5034 if (error_code)
5035 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
5036 return NULL;
5037 }
5038
5039 /* If we are migrating or importing this slot, we need to check
5040 * if we have all the keys in the request (the only way we
5041 * can safely serve the request, otherwise we return a TRYAGAIN
5042 * error). To do so we set the importing/migrating state and
5043 * increment a counter for every missing key. */
5044 if (n == myself &&
5045 server.cluster->migrating_slots_to[slot] != NULL)
5046 {
5047 migrating_slot = 1;
5048 } else if (server.cluster->importing_slots_from[slot] != NULL) {
5049 importing_slot = 1;
5050 }
5051 } else {
5052 /* If it is not the first key, make sure it is exactly
5053 * the same key as the first we saw. */
5054 if (!equalStringObjects(firstkey,thiskey)) {
5055 if (slot != thisslot) {
5056 /* Error: multiple keys from different slots. */
5057 getKeysFreeResult(keyindex);
5058 if (error_code)
5059 *error_code = CLUSTER_REDIR_CROSS_SLOT;
5060 return NULL;
5061 } else {
5062 /* Flag this request as one with multiple different
5063 * keys. */
5064 multiple_keys = 1;
5065 }
5066 }
5067 }
5068
5069 /* Migarting / Improrting slot? Count keys we don't have. */
5070 if ((migrating_slot || importing_slot) &&
5071 lookupKeyRead(&server.db[0],thiskey) == NULL)
5072 {
5073 missing_keys++;
5074 }
5075 }
5076 getKeysFreeResult(keyindex);
5077 }
5078
5079 /* No key at all in command? then we can serve the request
5080 * without redirections or errors in all the cases. */
5081 if (n == NULL) return myself;
5082
5083 /* Cluster is globally down but we got keys? We can't serve the request. */
5084 if (server.cluster->state != CLUSTER_OK) {
5085 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
5086 return NULL;
5087 }
5088
5089 /* Return the hashslot by reference. */
5090 if (hashslot) *hashslot = slot;
5091
5092 /* MIGRATE always works in the context of the local node if the slot
5093 * is open (migrating or importing state). We need to be able to freely
5094 * move keys among instances in this case. */
5095 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
5096 return myself;
5097
5098 /* If we don't have all the keys and we are migrating the slot, send
5099 * an ASK redirection. */
5100 if (migrating_slot && missing_keys) {
5101 if (error_code) *error_code = CLUSTER_REDIR_ASK;
5102 return server.cluster->migrating_slots_to[slot];
5103 }
5104
5105 /* If we are receiving the slot, and the client correctly flagged the
5106 * request as "ASKING", we can serve the request. However if the request
5107 * involves multiple keys and we don't have them all, the only option is
5108 * to send a TRYAGAIN error. */
5109 if (importing_slot &&
5110 (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
5111 {
5112 if (multiple_keys && missing_keys) {
5113 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
5114 return NULL;
5115 } else {
5116 return myself;
5117 }
5118 }
5119
5120 /* Handle the read-only client case reading from a slave: if this
5121 * node is a slave and the request is about an hash slot our master
5122 * is serving, we can reply without redirection. */
5123 if (c->flags & CLIENT_READONLY &&
5124 cmd->flags & CMD_READONLY &&
5125 nodeIsSlave(myself) &&
5126 myself->slaveof == n)
5127 {
5128 return myself;
5129 }
5130
5131 /* Base case: just return the right node. However if this node is not
5132 * myself, set error_code to MOVED since we need to issue a rediretion. */
5133 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
5134 return n;
5135 }
5136
5137 /* Send the client the right redirection code, according to error_code
5138 * that should be set to one of CLUSTER_REDIR_* macros.
5139 *
5140 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
5141 * are used, then the node 'n' should not be NULL, but should be the
5142 * node we want to mention in the redirection. Moreover hashslot should
5143 * be set to the hash slot that caused the redirection. */
clusterRedirectClient(client * c,clusterNode * n,int hashslot,int error_code)5144 void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
5145 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
5146 addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
5147 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
5148 /* The request spawns mutliple keys in the same slot,
5149 * but the slot is not "stable" currently as there is
5150 * a migration or import in progress. */
5151 addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
5152 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
5153 addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
5154 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
5155 addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
5156 } else if (error_code == CLUSTER_REDIR_MOVED ||
5157 error_code == CLUSTER_REDIR_ASK)
5158 {
5159 addReplySds(c,sdscatprintf(sdsempty(),
5160 "-%s %d %s:%d\r\n",
5161 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
5162 hashslot,n->ip,n->port));
5163 } else {
5164 serverPanic("getNodeByQuery() unknown error.");
5165 }
5166 }
5167
5168 /* This function is called by the function processing clients incrementally
5169 * to detect timeouts, in order to handle the following case:
5170 *
5171 * 1) A client blocks with BLPOP or similar blocking operation.
5172 * 2) The master migrates the hash slot elsewhere or turns into a slave.
5173 * 3) The client may remain blocked forever (or up to the max timeout time)
5174 * waiting for a key change that will never happen.
5175 *
5176 * If the client is found to be blocked into an hash slot this node no
5177 * longer handles, the client is sent a redirection error, and the function
5178 * returns 1. Otherwise 0 is returned and no operation is performed. */
clusterRedirectBlockedClientIfNeeded(client * c)5179 int clusterRedirectBlockedClientIfNeeded(client *c) {
5180 if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_LIST) {
5181 dictEntry *de;
5182 dictIterator *di;
5183
5184 /* If the cluster is down, unblock the client with the right error. */
5185 if (server.cluster->state == CLUSTER_FAIL) {
5186 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
5187 return 1;
5188 }
5189
5190 di = dictGetIterator(c->bpop.keys);
5191 while((de = dictNext(di)) != NULL) {
5192 robj *key = dictGetKey(de);
5193 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
5194 clusterNode *node = server.cluster->slots[slot];
5195
5196 /* We send an error and unblock the client if:
5197 * 1) The slot is unassigned, emitting a cluster down error.
5198 * 2) The slot is not handled by this node, nor being imported. */
5199 if (node != myself &&
5200 server.cluster->importing_slots_from[slot] == NULL)
5201 {
5202 if (node == NULL) {
5203 clusterRedirectClient(c,NULL,0,
5204 CLUSTER_REDIR_DOWN_UNBOUND);
5205 } else {
5206 clusterRedirectClient(c,node,slot,
5207 CLUSTER_REDIR_MOVED);
5208 }
5209 return 1;
5210 }
5211 }
5212 dictReleaseIterator(di);
5213 }
5214 return 0;
5215 }
5216