1 /* Redis CLI (command line interface) 2 * 3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "fmacros.h" 32 #include "version.h" 33 34 #include <stdio.h> 35 #include <string.h> 36 #include <stdlib.h> 37 #include <signal.h> 38 #include <unistd.h> 39 #include <time.h> 40 #include <ctype.h> 41 #include <errno.h> 42 #include <sys/stat.h> 43 #include <sys/time.h> 44 #include <assert.h> 45 #include <fcntl.h> 46 #include <limits.h> 47 #include <math.h> 48 49 #include <hiredis.h> 50 #include <sds.h> /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */ 51 #include "dict.h" 52 #include "adlist.h" 53 #include "zmalloc.h" 54 #include "linenoise.h" 55 #include "help.h" 56 #include "anet.h" 57 #include "ae.h" 58 59 #define UNUSED(V) ((void) V) 60 61 #define OUTPUT_STANDARD 0 62 #define OUTPUT_RAW 1 63 #define OUTPUT_CSV 2 64 #define REDIS_CLI_KEEPALIVE_INTERVAL 15 /* seconds */ 65 #define REDIS_CLI_DEFAULT_PIPE_TIMEOUT 30 /* seconds */ 66 #define REDIS_CLI_HISTFILE_ENV "REDISCLI_HISTFILE" 67 #define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history" 68 #define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE" 69 #define REDIS_CLI_RCFILE_DEFAULT ".redisclirc" 70 #define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH" 71 #define REDIS_CLI_CLUSTER_YES_ENV "REDISCLI_CLUSTER_YES" 72 73 #define CLUSTER_MANAGER_SLOTS 16384 74 #define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 75 #define CLUSTER_MANAGER_MIGRATE_PIPELINE 10 76 #define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2 77 78 #define CLUSTER_MANAGER_INVALID_HOST_ARG \ 79 "[ERR] Invalid arguments: you need to pass either a valid " \ 80 "address (ie. 120.0.0.1:7000) or space separated IP " \ 81 "and port (ie. 120.0.0.1 7000)\n" 82 #define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL) 83 #define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1)) 84 #define CLUSTER_MANAGER_COMMAND(n,...) \ 85 (redisCommand(n->context, __VA_ARGS__)) 86 87 #define CLUSTER_MANAGER_NODE_ARRAY_FREE(array) zfree(array->alloc) 88 89 #define CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err) \ 90 clusterManagerLogErr("Node %s:%d replied with error:\n%s\n", \ 91 n->ip, n->port, err); 92 93 #define clusterManagerLogInfo(...) \ 94 clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_INFO,__VA_ARGS__) 95 96 #define clusterManagerLogErr(...) \ 97 clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_ERR,__VA_ARGS__) 98 99 #define clusterManagerLogWarn(...) \ 100 clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_WARN,__VA_ARGS__) 101 102 #define clusterManagerLogOk(...) \ 103 clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_SUCCESS,__VA_ARGS__) 104 105 #define CLUSTER_MANAGER_FLAG_MYSELF 1 << 0 106 #define CLUSTER_MANAGER_FLAG_SLAVE 1 << 1 107 #define CLUSTER_MANAGER_FLAG_FRIEND 1 << 2 108 #define CLUSTER_MANAGER_FLAG_NOADDR 1 << 3 109 #define CLUSTER_MANAGER_FLAG_DISCONNECT 1 << 4 110 #define CLUSTER_MANAGER_FLAG_FAIL 1 << 5 111 112 #define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0 113 #define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1 114 #define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2 115 #define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS 1 << 3 116 #define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER 1 << 4 117 #define CLUSTER_MANAGER_CMD_FLAG_SIMULATE 1 << 5 118 #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6 119 #define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7 120 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8 121 #define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9 122 123 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 124 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 125 #define CLUSTER_MANAGER_OPT_UPDATE 1 << 2 126 #define CLUSTER_MANAGER_OPT_QUIET 1 << 6 127 #define CLUSTER_MANAGER_OPT_VERBOSE 1 << 7 128 129 #define CLUSTER_MANAGER_LOG_LVL_INFO 1 130 #define CLUSTER_MANAGER_LOG_LVL_WARN 2 131 #define CLUSTER_MANAGER_LOG_LVL_ERR 3 132 #define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4 133 134 #define LOG_COLOR_BOLD "29;1m" 135 #define LOG_COLOR_RED "31;1m" 136 #define LOG_COLOR_GREEN "32;1m" 137 #define LOG_COLOR_YELLOW "33;1m" 138 #define LOG_COLOR_RESET "0m" 139 140 /* cliConnect() flags. */ 141 #define CC_FORCE (1<<0) /* Re-connect if already connected. */ 142 #define CC_QUIET (1<<1) /* Don't log connecting errors. */ 143 144 /* --latency-dist palettes. */ 145 int spectrum_palette_color_size = 19; 146 int spectrum_palette_color[] = {0,233,234,235,237,239,241,243,245,247,144,143,142,184,226,214,208,202,196}; 147 148 int spectrum_palette_mono_size = 13; 149 int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253}; 150 151 /* The actual palette in use. */ 152 int *spectrum_palette; 153 int spectrum_palette_size; 154 155 /* Dict Helpers */ 156 157 static uint64_t dictSdsHash(const void *key); 158 static int dictSdsKeyCompare(void *privdata, const void *key1, 159 const void *key2); 160 static void dictSdsDestructor(void *privdata, void *val); 161 static void dictListDestructor(void *privdata, void *val); 162 163 /* Cluster Manager Command Info */ 164 typedef struct clusterManagerCommand { 165 char *name; 166 int argc; 167 char **argv; 168 int flags; 169 int replicas; 170 char *from; 171 char *to; 172 char **weight; 173 int weight_argc; 174 char *master_id; 175 int slots; 176 int timeout; 177 int pipeline; 178 float threshold; 179 } clusterManagerCommand; 180 181 static void createClusterManagerCommand(char *cmdname, int argc, char **argv); 182 183 184 static redisContext *context; 185 static struct config { 186 char *hostip; 187 int hostport; 188 char *hostsocket; 189 long repeat; 190 long interval; 191 int dbnum; 192 int interactive; 193 int shutdown; 194 int monitor_mode; 195 int pubsub_mode; 196 int latency_mode; 197 int latency_dist_mode; 198 int latency_history; 199 int lru_test_mode; 200 long long lru_test_sample_size; 201 int cluster_mode; 202 int cluster_reissue_command; 203 int slave_mode; 204 int pipe_mode; 205 int pipe_timeout; 206 int getrdb_mode; 207 int stat_mode; 208 int scan_mode; 209 int intrinsic_latency_mode; 210 int intrinsic_latency_duration; 211 char *pattern; 212 char *rdb_filename; 213 int bigkeys; 214 int memkeys; 215 unsigned memkeys_samples; 216 int hotkeys; 217 int stdinarg; /* get last arg from stdin. (-x option) */ 218 char *auth; 219 int output; /* output mode, see OUTPUT_* defines */ 220 sds mb_delim; 221 char prompt[128]; 222 char *eval; 223 int eval_ldb; 224 int eval_ldb_sync; /* Ask for synchronous mode of the Lua debugger. */ 225 int eval_ldb_end; /* Lua debugging session ended. */ 226 int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */ 227 int last_cmd_type; 228 int verbose; 229 clusterManagerCommand cluster_manager_command; 230 int no_auth_warning; 231 } config; 232 233 /* User preferences. */ 234 static struct pref { 235 int hints; 236 } pref; 237 238 static volatile sig_atomic_t force_cancel_loop = 0; 239 static void usage(void); 240 static void slaveMode(void); 241 char *redisGitSHA1(void); 242 char *redisGitDirty(void); 243 static int cliConnect(int force); 244 245 static char *getInfoField(char *info, char *field); 246 static long getLongInfoField(char *info, char *field); 247 248 /*------------------------------------------------------------------------------ 249 * Utility functions 250 *--------------------------------------------------------------------------- */ 251 252 uint16_t crc16(const char *buf, int len); 253 254 static long long ustime(void) { 255 struct timeval tv; 256 long long ust; 257 258 gettimeofday(&tv, NULL); 259 ust = ((long long)tv.tv_sec)*1000000; 260 ust += tv.tv_usec; 261 return ust; 262 } 263 264 static long long mstime(void) { 265 return ustime()/1000; 266 } 267 268 static void cliRefreshPrompt(void) { 269 if (config.eval_ldb) return; 270 271 sds prompt = sdsempty(); 272 if (config.hostsocket != NULL) { 273 prompt = sdscatfmt(prompt,"redis %s",config.hostsocket); 274 } else { 275 char addr[256]; 276 anetFormatAddr(addr, sizeof(addr), config.hostip, config.hostport); 277 prompt = sdscatlen(prompt,addr,strlen(addr)); 278 } 279 280 /* Add [dbnum] if needed */ 281 if (config.dbnum != 0) 282 prompt = sdscatfmt(prompt,"[%i]",config.dbnum); 283 284 /* Copy the prompt in the static buffer. */ 285 prompt = sdscatlen(prompt,"> ",2); 286 snprintf(config.prompt,sizeof(config.prompt),"%s",prompt); 287 sdsfree(prompt); 288 } 289 290 /* Return the name of the dotfile for the specified 'dotfilename'. 291 * Normally it just concatenates user $HOME to the file specified 292 * in 'dotfilename'. However if the environment varialbe 'envoverride' 293 * is set, its value is taken as the path. 294 * 295 * The function returns NULL (if the file is /dev/null or cannot be 296 * obtained for some error), or an SDS string that must be freed by 297 * the user. */ 298 static sds getDotfilePath(char *envoverride, char *dotfilename) { 299 char *path = NULL; 300 sds dotPath = NULL; 301 302 /* Check the env for a dotfile override. */ 303 path = getenv(envoverride); 304 if (path != NULL && *path != '\0') { 305 if (!strcmp("/dev/null", path)) { 306 return NULL; 307 } 308 309 /* If the env is set, return it. */ 310 dotPath = sdsnew(path); 311 } else { 312 char *home = getenv("HOME"); 313 if (home != NULL && *home != '\0') { 314 /* If no override is set use $HOME/<dotfilename>. */ 315 dotPath = sdscatprintf(sdsempty(), "%s/%s", home, dotfilename); 316 } 317 } 318 return dotPath; 319 } 320 321 /* URL-style percent decoding. */ 322 #define isHexChar(c) (isdigit(c) || (c >= 'a' && c <= 'f')) 323 #define decodeHexChar(c) (isdigit(c) ? c - '0' : c - 'a' + 10) 324 #define decodeHex(h, l) ((decodeHexChar(h) << 4) + decodeHexChar(l)) 325 326 static sds percentDecode(const char *pe, size_t len) { 327 const char *end = pe + len; 328 sds ret = sdsempty(); 329 const char *curr = pe; 330 331 while (curr < end) { 332 if (*curr == '%') { 333 if ((end - curr) < 2) { 334 fprintf(stderr, "Incomplete URI encoding\n"); 335 exit(1); 336 } 337 338 char h = tolower(*(++curr)); 339 char l = tolower(*(++curr)); 340 if (!isHexChar(h) || !isHexChar(l)) { 341 fprintf(stderr, "Illegal character in URI encoding\n"); 342 exit(1); 343 } 344 char c = decodeHex(h, l); 345 ret = sdscatlen(ret, &c, 1); 346 curr++; 347 } else { 348 ret = sdscatlen(ret, curr++, 1); 349 } 350 } 351 352 return ret; 353 } 354 355 /* Parse a URI and extract the server connection information. 356 * URI scheme is based on the the provisional specification[1] excluding support 357 * for query parameters. Valid URIs are: 358 * scheme: "redis://" 359 * authority: [<username> ":"] <password> "@"] [<hostname> [":" <port>]] 360 * path: ["/" [<db>]] 361 * 362 * [1]: https://www.iana.org/assignments/uri-schemes/prov/redis */ 363 static void parseRedisUri(const char *uri) { 364 365 const char *scheme = "redis://"; 366 const char *curr = uri; 367 const char *end = uri + strlen(uri); 368 const char *userinfo, *username, *port, *host, *path; 369 370 /* URI must start with a valid scheme. */ 371 if (strncasecmp(scheme, curr, strlen(scheme))) { 372 fprintf(stderr,"Invalid URI scheme\n"); 373 exit(1); 374 } 375 curr += strlen(scheme); 376 if (curr == end) return; 377 378 /* Extract user info. */ 379 if ((userinfo = strchr(curr,'@'))) { 380 if ((username = strchr(curr, ':')) && username < userinfo) { 381 /* If provided, username is ignored. */ 382 curr = username + 1; 383 } 384 385 config.auth = percentDecode(curr, userinfo - curr); 386 curr = userinfo + 1; 387 } 388 if (curr == end) return; 389 390 /* Extract host and port. */ 391 path = strchr(curr, '/'); 392 if (*curr != '/') { 393 host = path ? path - 1 : end; 394 if ((port = strchr(curr, ':'))) { 395 config.hostport = atoi(port + 1); 396 host = port - 1; 397 } 398 config.hostip = sdsnewlen(curr, host - curr + 1); 399 } 400 curr = path ? path + 1 : end; 401 if (curr == end) return; 402 403 /* Extract database number. */ 404 config.dbnum = atoi(curr); 405 } 406 407 static uint64_t dictSdsHash(const void *key) { 408 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); 409 } 410 411 static int dictSdsKeyCompare(void *privdata, const void *key1, 412 const void *key2) 413 { 414 int l1,l2; 415 DICT_NOTUSED(privdata); 416 417 l1 = sdslen((sds)key1); 418 l2 = sdslen((sds)key2); 419 if (l1 != l2) return 0; 420 return memcmp(key1, key2, l1) == 0; 421 } 422 423 static void dictSdsDestructor(void *privdata, void *val) 424 { 425 DICT_NOTUSED(privdata); 426 sdsfree(val); 427 } 428 429 void dictListDestructor(void *privdata, void *val) 430 { 431 DICT_NOTUSED(privdata); 432 listRelease((list*)val); 433 } 434 435 /* _serverAssert is needed by dict */ 436 void _serverAssert(const char *estr, const char *file, int line) { 437 fprintf(stderr, "=== ASSERTION FAILED ==="); 438 fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr); 439 *((char*)-1) = 'x'; 440 } 441 442 /*------------------------------------------------------------------------------ 443 * Help functions 444 *--------------------------------------------------------------------------- */ 445 446 #define CLI_HELP_COMMAND 1 447 #define CLI_HELP_GROUP 2 448 449 typedef struct { 450 int type; 451 int argc; 452 sds *argv; 453 sds full; 454 455 /* Only used for help on commands */ 456 struct commandHelp *org; 457 } helpEntry; 458 459 static helpEntry *helpEntries; 460 static int helpEntriesLen; 461 462 static sds cliVersion(void) { 463 sds version; 464 version = sdscatprintf(sdsempty(), "%s", REDIS_VERSION); 465 466 /* Add git commit and working tree status when available */ 467 if (strtoll(redisGitSHA1(),NULL,16)) { 468 version = sdscatprintf(version, " (git:%s", redisGitSHA1()); 469 if (strtoll(redisGitDirty(),NULL,10)) 470 version = sdscatprintf(version, "-dirty"); 471 version = sdscat(version, ")"); 472 } 473 return version; 474 } 475 476 static void cliInitHelp(void) { 477 int commandslen = sizeof(commandHelp)/sizeof(struct commandHelp); 478 int groupslen = sizeof(commandGroups)/sizeof(char*); 479 int i, len, pos = 0; 480 helpEntry tmp; 481 482 helpEntriesLen = len = commandslen+groupslen; 483 helpEntries = zmalloc(sizeof(helpEntry)*len); 484 485 for (i = 0; i < groupslen; i++) { 486 tmp.argc = 1; 487 tmp.argv = zmalloc(sizeof(sds)); 488 tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",commandGroups[i]); 489 tmp.full = tmp.argv[0]; 490 tmp.type = CLI_HELP_GROUP; 491 tmp.org = NULL; 492 helpEntries[pos++] = tmp; 493 } 494 495 for (i = 0; i < commandslen; i++) { 496 tmp.argv = sdssplitargs(commandHelp[i].name,&tmp.argc); 497 tmp.full = sdsnew(commandHelp[i].name); 498 tmp.type = CLI_HELP_COMMAND; 499 tmp.org = &commandHelp[i]; 500 helpEntries[pos++] = tmp; 501 } 502 } 503 504 /* cliInitHelp() setups the helpEntries array with the command and group 505 * names from the help.h file. However the Redis instance we are connecting 506 * to may support more commands, so this function integrates the previous 507 * entries with additional entries obtained using the COMMAND command 508 * available in recent versions of Redis. */ 509 static void cliIntegrateHelp(void) { 510 if (cliConnect(CC_QUIET) == REDIS_ERR) return; 511 512 redisReply *reply = redisCommand(context, "COMMAND"); 513 if(reply == NULL || reply->type != REDIS_REPLY_ARRAY) return; 514 515 /* Scan the array reported by COMMAND and fill only the entries that 516 * don't already match what we have. */ 517 for (size_t j = 0; j < reply->elements; j++) { 518 redisReply *entry = reply->element[j]; 519 if (entry->type != REDIS_REPLY_ARRAY || entry->elements < 4 || 520 entry->element[0]->type != REDIS_REPLY_STRING || 521 entry->element[1]->type != REDIS_REPLY_INTEGER || 522 entry->element[3]->type != REDIS_REPLY_INTEGER) return; 523 char *cmdname = entry->element[0]->str; 524 int i; 525 526 for (i = 0; i < helpEntriesLen; i++) { 527 helpEntry *he = helpEntries+i; 528 if (!strcasecmp(he->argv[0],cmdname)) 529 break; 530 } 531 if (i != helpEntriesLen) continue; 532 533 helpEntriesLen++; 534 helpEntries = zrealloc(helpEntries,sizeof(helpEntry)*helpEntriesLen); 535 helpEntry *new = helpEntries+(helpEntriesLen-1); 536 537 new->argc = 1; 538 new->argv = zmalloc(sizeof(sds)); 539 new->argv[0] = sdsnew(cmdname); 540 new->full = new->argv[0]; 541 new->type = CLI_HELP_COMMAND; 542 sdstoupper(new->argv[0]); 543 544 struct commandHelp *ch = zmalloc(sizeof(*ch)); 545 ch->name = new->argv[0]; 546 ch->params = sdsempty(); 547 int args = llabs(entry->element[1]->integer); 548 args--; /* Remove the command name itself. */ 549 if (entry->element[3]->integer == 1) { 550 ch->params = sdscat(ch->params,"key "); 551 args--; 552 } 553 while(args-- > 0) ch->params = sdscat(ch->params,"arg "); 554 if (entry->element[1]->integer < 0) 555 ch->params = sdscat(ch->params,"...options..."); 556 ch->summary = "Help not available"; 557 ch->group = 0; 558 ch->since = "not known"; 559 new->org = ch; 560 } 561 freeReplyObject(reply); 562 } 563 564 /* Output command help to stdout. */ 565 static void cliOutputCommandHelp(struct commandHelp *help, int group) { 566 printf("\r\n \x1b[1m%s\x1b[0m \x1b[90m%s\x1b[0m\r\n", help->name, help->params); 567 printf(" \x1b[33msummary:\x1b[0m %s\r\n", help->summary); 568 printf(" \x1b[33msince:\x1b[0m %s\r\n", help->since); 569 if (group) { 570 printf(" \x1b[33mgroup:\x1b[0m %s\r\n", commandGroups[help->group]); 571 } 572 } 573 574 /* Print generic help. */ 575 static void cliOutputGenericHelp(void) { 576 sds version = cliVersion(); 577 printf( 578 "redis-cli %s\n" 579 "To get help about Redis commands type:\n" 580 " \"help @<group>\" to get a list of commands in <group>\n" 581 " \"help <command>\" for help on <command>\n" 582 " \"help <tab>\" to get a list of possible help topics\n" 583 " \"quit\" to exit\n" 584 "\n" 585 "To set redis-cli preferences:\n" 586 " \":set hints\" enable online hints\n" 587 " \":set nohints\" disable online hints\n" 588 "Set your preferences in ~/.redisclirc\n", 589 version 590 ); 591 sdsfree(version); 592 } 593 594 /* Output all command help, filtering by group or command name. */ 595 static void cliOutputHelp(int argc, char **argv) { 596 int i, j, len; 597 int group = -1; 598 helpEntry *entry; 599 struct commandHelp *help; 600 601 if (argc == 0) { 602 cliOutputGenericHelp(); 603 return; 604 } else if (argc > 0 && argv[0][0] == '@') { 605 len = sizeof(commandGroups)/sizeof(char*); 606 for (i = 0; i < len; i++) { 607 if (strcasecmp(argv[0]+1,commandGroups[i]) == 0) { 608 group = i; 609 break; 610 } 611 } 612 } 613 614 assert(argc > 0); 615 for (i = 0; i < helpEntriesLen; i++) { 616 entry = &helpEntries[i]; 617 if (entry->type != CLI_HELP_COMMAND) continue; 618 619 help = entry->org; 620 if (group == -1) { 621 /* Compare all arguments */ 622 if (argc == entry->argc) { 623 for (j = 0; j < argc; j++) { 624 if (strcasecmp(argv[j],entry->argv[j]) != 0) break; 625 } 626 if (j == argc) { 627 cliOutputCommandHelp(help,1); 628 } 629 } 630 } else { 631 if (group == help->group) { 632 cliOutputCommandHelp(help,0); 633 } 634 } 635 } 636 printf("\r\n"); 637 } 638 639 /* Linenoise completion callback. */ 640 static void completionCallback(const char *buf, linenoiseCompletions *lc) { 641 size_t startpos = 0; 642 int mask; 643 int i; 644 size_t matchlen; 645 sds tmp; 646 647 if (strncasecmp(buf,"help ",5) == 0) { 648 startpos = 5; 649 while (isspace(buf[startpos])) startpos++; 650 mask = CLI_HELP_COMMAND | CLI_HELP_GROUP; 651 } else { 652 mask = CLI_HELP_COMMAND; 653 } 654 655 for (i = 0; i < helpEntriesLen; i++) { 656 if (!(helpEntries[i].type & mask)) continue; 657 658 matchlen = strlen(buf+startpos); 659 if (strncasecmp(buf+startpos,helpEntries[i].full,matchlen) == 0) { 660 tmp = sdsnewlen(buf,startpos); 661 tmp = sdscat(tmp,helpEntries[i].full); 662 linenoiseAddCompletion(lc,tmp); 663 sdsfree(tmp); 664 } 665 } 666 } 667 668 /* Linenoise hints callback. */ 669 static char *hintsCallback(const char *buf, int *color, int *bold) { 670 if (!pref.hints) return NULL; 671 672 int i, argc, buflen = strlen(buf); 673 sds *argv = sdssplitargs(buf,&argc); 674 int endspace = buflen && isspace(buf[buflen-1]); 675 676 /* Check if the argument list is empty and return ASAP. */ 677 if (argc == 0) { 678 sdsfreesplitres(argv,argc); 679 return NULL; 680 } 681 682 for (i = 0; i < helpEntriesLen; i++) { 683 if (!(helpEntries[i].type & CLI_HELP_COMMAND)) continue; 684 685 if (strcasecmp(argv[0],helpEntries[i].full) == 0) 686 { 687 *color = 90; 688 *bold = 0; 689 sds hint = sdsnew(helpEntries[i].org->params); 690 691 /* Remove arguments from the returned hint to show only the 692 * ones the user did not yet typed. */ 693 int toremove = argc-1; 694 while(toremove > 0 && sdslen(hint)) { 695 if (hint[0] == '[') break; 696 if (hint[0] == ' ') toremove--; 697 sdsrange(hint,1,-1); 698 } 699 700 /* Add an initial space if needed. */ 701 if (!endspace) { 702 sds newhint = sdsnewlen(" ",1); 703 newhint = sdscatsds(newhint,hint); 704 sdsfree(hint); 705 hint = newhint; 706 } 707 708 sdsfreesplitres(argv,argc); 709 return hint; 710 } 711 } 712 sdsfreesplitres(argv,argc); 713 return NULL; 714 } 715 716 static void freeHintsCallback(void *ptr) { 717 sdsfree(ptr); 718 } 719 720 /*------------------------------------------------------------------------------ 721 * Networking / parsing 722 *--------------------------------------------------------------------------- */ 723 724 /* Send AUTH command to the server */ 725 static int cliAuth(void) { 726 redisReply *reply; 727 if (config.auth == NULL) return REDIS_OK; 728 729 reply = redisCommand(context,"AUTH %s",config.auth); 730 if (reply != NULL) { 731 freeReplyObject(reply); 732 return REDIS_OK; 733 } 734 return REDIS_ERR; 735 } 736 737 /* Send SELECT dbnum to the server */ 738 static int cliSelect(void) { 739 redisReply *reply; 740 if (config.dbnum == 0) return REDIS_OK; 741 742 reply = redisCommand(context,"SELECT %d",config.dbnum); 743 if (reply != NULL) { 744 int result = REDIS_OK; 745 if (reply->type == REDIS_REPLY_ERROR) result = REDIS_ERR; 746 freeReplyObject(reply); 747 return result; 748 } 749 return REDIS_ERR; 750 } 751 752 /* Connect to the server. It is possible to pass certain flags to the function: 753 * CC_FORCE: The connection is performed even if there is already 754 * a connected socket. 755 * CC_QUIET: Don't print errors if connection fails. */ 756 static int cliConnect(int flags) { 757 if (context == NULL || flags & CC_FORCE) { 758 if (context != NULL) { 759 redisFree(context); 760 } 761 762 if (config.hostsocket == NULL) { 763 context = redisConnect(config.hostip,config.hostport); 764 } else { 765 context = redisConnectUnix(config.hostsocket); 766 } 767 768 if (context->err) { 769 if (!(flags & CC_QUIET)) { 770 fprintf(stderr,"Could not connect to Redis at "); 771 if (config.hostsocket == NULL) 772 fprintf(stderr,"%s:%d: %s\n", 773 config.hostip,config.hostport,context->errstr); 774 else 775 fprintf(stderr,"%s: %s\n", 776 config.hostsocket,context->errstr); 777 } 778 redisFree(context); 779 context = NULL; 780 return REDIS_ERR; 781 } 782 783 /* Set aggressive KEEP_ALIVE socket option in the Redis context socket 784 * in order to prevent timeouts caused by the execution of long 785 * commands. At the same time this improves the detection of real 786 * errors. */ 787 anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); 788 789 /* Do AUTH and select the right DB. */ 790 if (cliAuth() != REDIS_OK) 791 return REDIS_ERR; 792 if (cliSelect() != REDIS_OK) 793 return REDIS_ERR; 794 } 795 return REDIS_OK; 796 } 797 798 static void cliPrintContextError(void) { 799 if (context == NULL) return; 800 fprintf(stderr,"Error: %s\n",context->errstr); 801 } 802 803 static sds cliFormatReplyTTY(redisReply *r, char *prefix) { 804 sds out = sdsempty(); 805 switch (r->type) { 806 case REDIS_REPLY_ERROR: 807 out = sdscatprintf(out,"(error) %s\n", r->str); 808 break; 809 case REDIS_REPLY_STATUS: 810 out = sdscat(out,r->str); 811 out = sdscat(out,"\n"); 812 break; 813 case REDIS_REPLY_INTEGER: 814 out = sdscatprintf(out,"(integer) %lld\n",r->integer); 815 break; 816 case REDIS_REPLY_STRING: 817 /* If you are producing output for the standard output we want 818 * a more interesting output with quoted characters and so forth */ 819 out = sdscatrepr(out,r->str,r->len); 820 out = sdscat(out,"\n"); 821 break; 822 case REDIS_REPLY_NIL: 823 out = sdscat(out,"(nil)\n"); 824 break; 825 case REDIS_REPLY_ARRAY: 826 if (r->elements == 0) { 827 out = sdscat(out,"(empty list or set)\n"); 828 } else { 829 unsigned int i, idxlen = 0; 830 char _prefixlen[16]; 831 char _prefixfmt[16]; 832 sds _prefix; 833 sds tmp; 834 835 /* Calculate chars needed to represent the largest index */ 836 i = r->elements; 837 do { 838 idxlen++; 839 i /= 10; 840 } while(i); 841 842 /* Prefix for nested multi bulks should grow with idxlen+2 spaces */ 843 memset(_prefixlen,' ',idxlen+2); 844 _prefixlen[idxlen+2] = '\0'; 845 _prefix = sdscat(sdsnew(prefix),_prefixlen); 846 847 /* Setup prefix format for every entry */ 848 snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud) ",idxlen); 849 850 for (i = 0; i < r->elements; i++) { 851 /* Don't use the prefix for the first element, as the parent 852 * caller already prepended the index number. */ 853 out = sdscatprintf(out,_prefixfmt,i == 0 ? "" : prefix,i+1); 854 855 /* Format the multi bulk entry */ 856 tmp = cliFormatReplyTTY(r->element[i],_prefix); 857 out = sdscatlen(out,tmp,sdslen(tmp)); 858 sdsfree(tmp); 859 } 860 sdsfree(_prefix); 861 } 862 break; 863 default: 864 fprintf(stderr,"Unknown reply type: %d\n", r->type); 865 exit(1); 866 } 867 return out; 868 } 869 870 int isColorTerm(void) { 871 char *t = getenv("TERM"); 872 return t != NULL && strstr(t,"xterm") != NULL; 873 } 874 875 /* Helper function for sdsCatColorizedLdbReply() appending colorize strings 876 * to an SDS string. */ 877 sds sdscatcolor(sds o, char *s, size_t len, char *color) { 878 if (!isColorTerm()) return sdscatlen(o,s,len); 879 880 int bold = strstr(color,"bold") != NULL; 881 int ccode = 37; /* Defaults to white. */ 882 if (strstr(color,"red")) ccode = 31; 883 else if (strstr(color,"green")) ccode = 32; 884 else if (strstr(color,"yellow")) ccode = 33; 885 else if (strstr(color,"blue")) ccode = 34; 886 else if (strstr(color,"magenta")) ccode = 35; 887 else if (strstr(color,"cyan")) ccode = 36; 888 else if (strstr(color,"white")) ccode = 37; 889 890 o = sdscatfmt(o,"\033[%i;%i;49m",bold,ccode); 891 o = sdscatlen(o,s,len); 892 o = sdscat(o,"\033[0m"); 893 return o; 894 } 895 896 /* Colorize Lua debugger status replies according to the prefix they 897 * have. */ 898 sds sdsCatColorizedLdbReply(sds o, char *s, size_t len) { 899 char *color = "white"; 900 901 if (strstr(s,"<debug>")) color = "bold"; 902 if (strstr(s,"<redis>")) color = "green"; 903 if (strstr(s,"<reply>")) color = "cyan"; 904 if (strstr(s,"<error>")) color = "red"; 905 if (strstr(s,"<hint>")) color = "bold"; 906 if (strstr(s,"<value>") || strstr(s,"<retval>")) color = "magenta"; 907 if (len > 4 && isdigit(s[3])) { 908 if (s[1] == '>') color = "yellow"; /* Current line. */ 909 else if (s[2] == '#') color = "bold"; /* Break point. */ 910 } 911 return sdscatcolor(o,s,len,color); 912 } 913 914 static sds cliFormatReplyRaw(redisReply *r) { 915 sds out = sdsempty(), tmp; 916 size_t i; 917 918 switch (r->type) { 919 case REDIS_REPLY_NIL: 920 /* Nothing... */ 921 break; 922 case REDIS_REPLY_ERROR: 923 out = sdscatlen(out,r->str,r->len); 924 out = sdscatlen(out,"\n",1); 925 break; 926 case REDIS_REPLY_STATUS: 927 case REDIS_REPLY_STRING: 928 if (r->type == REDIS_REPLY_STATUS && config.eval_ldb) { 929 /* The Lua debugger replies with arrays of simple (status) 930 * strings. We colorize the output for more fun if this 931 * is a debugging session. */ 932 933 /* Detect the end of a debugging session. */ 934 if (strstr(r->str,"<endsession>") == r->str) { 935 config.enable_ldb_on_eval = 0; 936 config.eval_ldb = 0; 937 config.eval_ldb_end = 1; /* Signal the caller session ended. */ 938 config.output = OUTPUT_STANDARD; 939 cliRefreshPrompt(); 940 } else { 941 out = sdsCatColorizedLdbReply(out,r->str,r->len); 942 } 943 } else { 944 out = sdscatlen(out,r->str,r->len); 945 } 946 break; 947 case REDIS_REPLY_INTEGER: 948 out = sdscatprintf(out,"%lld",r->integer); 949 break; 950 case REDIS_REPLY_ARRAY: 951 for (i = 0; i < r->elements; i++) { 952 if (i > 0) out = sdscat(out,config.mb_delim); 953 tmp = cliFormatReplyRaw(r->element[i]); 954 out = sdscatlen(out,tmp,sdslen(tmp)); 955 sdsfree(tmp); 956 } 957 break; 958 default: 959 fprintf(stderr,"Unknown reply type: %d\n", r->type); 960 exit(1); 961 } 962 return out; 963 } 964 965 static sds cliFormatReplyCSV(redisReply *r) { 966 unsigned int i; 967 968 sds out = sdsempty(); 969 switch (r->type) { 970 case REDIS_REPLY_ERROR: 971 out = sdscat(out,"ERROR,"); 972 out = sdscatrepr(out,r->str,strlen(r->str)); 973 break; 974 case REDIS_REPLY_STATUS: 975 out = sdscatrepr(out,r->str,r->len); 976 break; 977 case REDIS_REPLY_INTEGER: 978 out = sdscatprintf(out,"%lld",r->integer); 979 break; 980 case REDIS_REPLY_STRING: 981 out = sdscatrepr(out,r->str,r->len); 982 break; 983 case REDIS_REPLY_NIL: 984 out = sdscat(out,"NIL"); 985 break; 986 case REDIS_REPLY_ARRAY: 987 for (i = 0; i < r->elements; i++) { 988 sds tmp = cliFormatReplyCSV(r->element[i]); 989 out = sdscatlen(out,tmp,sdslen(tmp)); 990 if (i != r->elements-1) out = sdscat(out,","); 991 sdsfree(tmp); 992 } 993 break; 994 default: 995 fprintf(stderr,"Unknown reply type: %d\n", r->type); 996 exit(1); 997 } 998 return out; 999 } 1000 1001 static int cliReadReply(int output_raw_strings) { 1002 void *_reply; 1003 redisReply *reply; 1004 sds out = NULL; 1005 int output = 1; 1006 1007 if (redisGetReply(context,&_reply) != REDIS_OK) { 1008 if (config.shutdown) { 1009 redisFree(context); 1010 context = NULL; 1011 return REDIS_OK; 1012 } 1013 if (config.interactive) { 1014 /* Filter cases where we should reconnect */ 1015 if (context->err == REDIS_ERR_IO && 1016 (errno == ECONNRESET || errno == EPIPE)) 1017 return REDIS_ERR; 1018 if (context->err == REDIS_ERR_EOF) 1019 return REDIS_ERR; 1020 } 1021 cliPrintContextError(); 1022 exit(1); 1023 return REDIS_ERR; /* avoid compiler warning */ 1024 } 1025 1026 reply = (redisReply*)_reply; 1027 1028 config.last_cmd_type = reply->type; 1029 1030 /* Check if we need to connect to a different node and reissue the 1031 * request. */ 1032 if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR && 1033 (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK"))) 1034 { 1035 char *p = reply->str, *s; 1036 int slot; 1037 1038 output = 0; 1039 /* Comments show the position of the pointer as: 1040 * 1041 * [S] for pointer 's' 1042 * [P] for pointer 'p' 1043 */ 1044 s = strchr(p,' '); /* MOVED[S]3999 127.0.0.1:6381 */ 1045 p = strchr(s+1,' '); /* MOVED[S]3999[P]127.0.0.1:6381 */ 1046 *p = '\0'; 1047 slot = atoi(s+1); 1048 s = strrchr(p+1,':'); /* MOVED 3999[P]127.0.0.1[S]6381 */ 1049 *s = '\0'; 1050 sdsfree(config.hostip); 1051 config.hostip = sdsnew(p+1); 1052 config.hostport = atoi(s+1); 1053 if (config.interactive) 1054 printf("-> Redirected to slot [%d] located at %s:%d\n", 1055 slot, config.hostip, config.hostport); 1056 config.cluster_reissue_command = 1; 1057 cliRefreshPrompt(); 1058 } 1059 1060 if (output) { 1061 if (output_raw_strings) { 1062 out = cliFormatReplyRaw(reply); 1063 } else { 1064 if (config.output == OUTPUT_RAW) { 1065 out = cliFormatReplyRaw(reply); 1066 out = sdscat(out,"\n"); 1067 } else if (config.output == OUTPUT_STANDARD) { 1068 out = cliFormatReplyTTY(reply,""); 1069 } else if (config.output == OUTPUT_CSV) { 1070 out = cliFormatReplyCSV(reply); 1071 out = sdscat(out,"\n"); 1072 } 1073 } 1074 fwrite(out,sdslen(out),1,stdout); 1075 sdsfree(out); 1076 } 1077 freeReplyObject(reply); 1078 return REDIS_OK; 1079 } 1080 1081 static int cliSendCommand(int argc, char **argv, long repeat) { 1082 char *command = argv[0]; 1083 size_t *argvlen; 1084 int j, output_raw; 1085 1086 if (!config.eval_ldb && /* In debugging mode, let's pass "help" to Redis. */ 1087 (!strcasecmp(command,"help") || !strcasecmp(command,"?"))) { 1088 cliOutputHelp(--argc, ++argv); 1089 return REDIS_OK; 1090 } 1091 1092 if (context == NULL) return REDIS_ERR; 1093 1094 output_raw = 0; 1095 if (!strcasecmp(command,"info") || 1096 !strcasecmp(command,"lolwut") || 1097 (argc >= 2 && !strcasecmp(command,"debug") && 1098 !strcasecmp(argv[1],"htstats")) || 1099 (argc >= 2 && !strcasecmp(command,"debug") && 1100 !strcasecmp(argv[1],"htstats-key")) || 1101 (argc >= 2 && !strcasecmp(command,"memory") && 1102 (!strcasecmp(argv[1],"malloc-stats") || 1103 !strcasecmp(argv[1],"doctor"))) || 1104 (argc == 2 && !strcasecmp(command,"cluster") && 1105 (!strcasecmp(argv[1],"nodes") || 1106 !strcasecmp(argv[1],"info"))) || 1107 (argc >= 2 && !strcasecmp(command,"client") && 1108 !strcasecmp(argv[1],"list")) || 1109 (argc == 3 && !strcasecmp(command,"latency") && 1110 !strcasecmp(argv[1],"graph")) || 1111 (argc == 2 && !strcasecmp(command,"latency") && 1112 !strcasecmp(argv[1],"doctor"))) 1113 { 1114 output_raw = 1; 1115 } 1116 1117 if (!strcasecmp(command,"shutdown")) config.shutdown = 1; 1118 if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; 1119 if (!strcasecmp(command,"subscribe") || 1120 !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1; 1121 if (!strcasecmp(command,"sync") || 1122 !strcasecmp(command,"psync")) config.slave_mode = 1; 1123 1124 /* When the user manually calls SCRIPT DEBUG, setup the activation of 1125 * debugging mode on the next eval if needed. */ 1126 if (argc == 3 && !strcasecmp(argv[0],"script") && 1127 !strcasecmp(argv[1],"debug")) 1128 { 1129 if (!strcasecmp(argv[2],"yes") || !strcasecmp(argv[2],"sync")) { 1130 config.enable_ldb_on_eval = 1; 1131 } else { 1132 config.enable_ldb_on_eval = 0; 1133 } 1134 } 1135 1136 /* Actually activate LDB on EVAL if needed. */ 1137 if (!strcasecmp(command,"eval") && config.enable_ldb_on_eval) { 1138 config.eval_ldb = 1; 1139 config.output = OUTPUT_RAW; 1140 } 1141 1142 /* Setup argument length */ 1143 argvlen = zmalloc(argc*sizeof(size_t)); 1144 for (j = 0; j < argc; j++) 1145 argvlen[j] = sdslen(argv[j]); 1146 1147 while(repeat-- > 0) { 1148 redisAppendCommandArgv(context,argc,(const char**)argv,argvlen); 1149 while (config.monitor_mode) { 1150 if (cliReadReply(output_raw) != REDIS_OK) exit(1); 1151 fflush(stdout); 1152 } 1153 1154 if (config.pubsub_mode) { 1155 if (config.output != OUTPUT_RAW) 1156 printf("Reading messages... (press Ctrl-C to quit)\n"); 1157 while (1) { 1158 if (cliReadReply(output_raw) != REDIS_OK) exit(1); 1159 } 1160 } 1161 1162 if (config.slave_mode) { 1163 printf("Entering replica output mode... (press Ctrl-C to quit)\n"); 1164 slaveMode(); 1165 config.slave_mode = 0; 1166 zfree(argvlen); 1167 return REDIS_ERR; /* Error = slaveMode lost connection to master */ 1168 } 1169 1170 if (cliReadReply(output_raw) != REDIS_OK) { 1171 zfree(argvlen); 1172 return REDIS_ERR; 1173 } else { 1174 /* Store database number when SELECT was successfully executed. */ 1175 if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) { 1176 config.dbnum = atoi(argv[1]); 1177 cliRefreshPrompt(); 1178 } else if (!strcasecmp(command,"auth") && argc == 2) { 1179 cliSelect(); 1180 } 1181 } 1182 if (config.interval) usleep(config.interval); 1183 fflush(stdout); /* Make it grep friendly */ 1184 } 1185 1186 zfree(argvlen); 1187 return REDIS_OK; 1188 } 1189 1190 /* Send a command reconnecting the link if needed. */ 1191 static redisReply *reconnectingRedisCommand(redisContext *c, const char *fmt, ...) { 1192 redisReply *reply = NULL; 1193 int tries = 0; 1194 va_list ap; 1195 1196 assert(!c->err); 1197 while(reply == NULL) { 1198 while (c->err & (REDIS_ERR_IO | REDIS_ERR_EOF)) { 1199 printf("\r\x1b[0K"); /* Cursor to left edge + clear line. */ 1200 printf("Reconnecting... %d\r", ++tries); 1201 fflush(stdout); 1202 1203 redisFree(c); 1204 c = redisConnect(config.hostip,config.hostport); 1205 usleep(1000000); 1206 } 1207 1208 va_start(ap,fmt); 1209 reply = redisvCommand(c,fmt,ap); 1210 va_end(ap); 1211 1212 if (c->err && !(c->err & (REDIS_ERR_IO | REDIS_ERR_EOF))) { 1213 fprintf(stderr, "Error: %s\n", c->errstr); 1214 exit(1); 1215 } else if (tries > 0) { 1216 printf("\r\x1b[0K"); /* Cursor to left edge + clear line. */ 1217 } 1218 } 1219 1220 context = c; 1221 return reply; 1222 } 1223 1224 /*------------------------------------------------------------------------------ 1225 * User interface 1226 *--------------------------------------------------------------------------- */ 1227 1228 static int parseOptions(int argc, char **argv) { 1229 int i; 1230 1231 for (i = 1; i < argc; i++) { 1232 int lastarg = i==argc-1; 1233 1234 if (!strcmp(argv[i],"-h") && !lastarg) { 1235 sdsfree(config.hostip); 1236 config.hostip = sdsnew(argv[++i]); 1237 } else if (!strcmp(argv[i],"-h") && lastarg) { 1238 usage(); 1239 } else if (!strcmp(argv[i],"--help")) { 1240 usage(); 1241 } else if (!strcmp(argv[i],"-x")) { 1242 config.stdinarg = 1; 1243 } else if (!strcmp(argv[i],"-p") && !lastarg) { 1244 config.hostport = atoi(argv[++i]); 1245 } else if (!strcmp(argv[i],"-s") && !lastarg) { 1246 config.hostsocket = argv[++i]; 1247 } else if (!strcmp(argv[i],"-r") && !lastarg) { 1248 config.repeat = strtoll(argv[++i],NULL,10); 1249 } else if (!strcmp(argv[i],"-i") && !lastarg) { 1250 double seconds = atof(argv[++i]); 1251 config.interval = seconds*1000000; 1252 } else if (!strcmp(argv[i],"-n") && !lastarg) { 1253 config.dbnum = atoi(argv[++i]); 1254 } else if (!strcmp(argv[i], "--no-auth-warning")) { 1255 config.no_auth_warning = 1; 1256 } else if (!strcmp(argv[i],"-a") && !lastarg) { 1257 config.auth = argv[++i]; 1258 } else if (!strcmp(argv[i],"-u") && !lastarg) { 1259 parseRedisUri(argv[++i]); 1260 } else if (!strcmp(argv[i],"--raw")) { 1261 config.output = OUTPUT_RAW; 1262 } else if (!strcmp(argv[i],"--no-raw")) { 1263 config.output = OUTPUT_STANDARD; 1264 } else if (!strcmp(argv[i],"--csv")) { 1265 config.output = OUTPUT_CSV; 1266 } else if (!strcmp(argv[i],"--latency")) { 1267 config.latency_mode = 1; 1268 } else if (!strcmp(argv[i],"--latency-dist")) { 1269 config.latency_dist_mode = 1; 1270 } else if (!strcmp(argv[i],"--mono")) { 1271 spectrum_palette = spectrum_palette_mono; 1272 spectrum_palette_size = spectrum_palette_mono_size; 1273 } else if (!strcmp(argv[i],"--latency-history")) { 1274 config.latency_mode = 1; 1275 config.latency_history = 1; 1276 } else if (!strcmp(argv[i],"--lru-test") && !lastarg) { 1277 config.lru_test_mode = 1; 1278 config.lru_test_sample_size = strtoll(argv[++i],NULL,10); 1279 } else if (!strcmp(argv[i],"--slave")) { 1280 config.slave_mode = 1; 1281 } else if (!strcmp(argv[i],"--replica")) { 1282 config.slave_mode = 1; 1283 } else if (!strcmp(argv[i],"--stat")) { 1284 config.stat_mode = 1; 1285 } else if (!strcmp(argv[i],"--scan")) { 1286 config.scan_mode = 1; 1287 } else if (!strcmp(argv[i],"--pattern") && !lastarg) { 1288 config.pattern = argv[++i]; 1289 } else if (!strcmp(argv[i],"--intrinsic-latency") && !lastarg) { 1290 config.intrinsic_latency_mode = 1; 1291 config.intrinsic_latency_duration = atoi(argv[++i]); 1292 } else if (!strcmp(argv[i],"--rdb") && !lastarg) { 1293 config.getrdb_mode = 1; 1294 config.rdb_filename = argv[++i]; 1295 } else if (!strcmp(argv[i],"--pipe")) { 1296 config.pipe_mode = 1; 1297 } else if (!strcmp(argv[i],"--pipe-timeout") && !lastarg) { 1298 config.pipe_timeout = atoi(argv[++i]); 1299 } else if (!strcmp(argv[i],"--bigkeys")) { 1300 config.bigkeys = 1; 1301 } else if (!strcmp(argv[i],"--memkeys")) { 1302 config.memkeys = 1; 1303 config.memkeys_samples = 0; /* use redis default */ 1304 } else if (!strcmp(argv[i],"--memkeys-samples")) { 1305 config.memkeys = 1; 1306 config.memkeys_samples = atoi(argv[++i]); 1307 } else if (!strcmp(argv[i],"--hotkeys")) { 1308 config.hotkeys = 1; 1309 } else if (!strcmp(argv[i],"--eval") && !lastarg) { 1310 config.eval = argv[++i]; 1311 } else if (!strcmp(argv[i],"--ldb")) { 1312 config.eval_ldb = 1; 1313 config.output = OUTPUT_RAW; 1314 } else if (!strcmp(argv[i],"--ldb-sync-mode")) { 1315 config.eval_ldb = 1; 1316 config.eval_ldb_sync = 1; 1317 config.output = OUTPUT_RAW; 1318 } else if (!strcmp(argv[i],"-c")) { 1319 config.cluster_mode = 1; 1320 } else if (!strcmp(argv[i],"-d") && !lastarg) { 1321 sdsfree(config.mb_delim); 1322 config.mb_delim = sdsnew(argv[++i]); 1323 } else if (!strcmp(argv[i],"--verbose")) { 1324 config.verbose = 1; 1325 } else if (!strcmp(argv[i],"--cluster") && !lastarg) { 1326 if (CLUSTER_MANAGER_MODE()) usage(); 1327 char *cmd = argv[++i]; 1328 int j = i; 1329 while (j < argc && argv[j][0] != '-') j++; 1330 if (j > i) j--; 1331 createClusterManagerCommand(cmd, j - i, argv + i + 1); 1332 i = j; 1333 } else if (!strcmp(argv[i],"--cluster") && lastarg) { 1334 usage(); 1335 } else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) { 1336 config.cluster_manager_command.replicas = atoi(argv[++i]); 1337 } else if (!strcmp(argv[i],"--cluster-master-id") && !lastarg) { 1338 config.cluster_manager_command.master_id = argv[++i]; 1339 } else if (!strcmp(argv[i],"--cluster-from") && !lastarg) { 1340 config.cluster_manager_command.from = argv[++i]; 1341 } else if (!strcmp(argv[i],"--cluster-to") && !lastarg) { 1342 config.cluster_manager_command.to = argv[++i]; 1343 } else if (!strcmp(argv[i],"--cluster-weight") && !lastarg) { 1344 if (config.cluster_manager_command.weight != NULL) { 1345 fprintf(stderr, "WARNING: you cannot use --cluster-weight " 1346 "more than once.\n" 1347 "You can set more weights by adding them " 1348 "as a space-separated list, ie:\n" 1349 "--cluster-weight n1=w n2=w\n"); 1350 exit(1); 1351 } 1352 int widx = i + 1; 1353 char **weight = argv + widx; 1354 int wargc = 0; 1355 for (; widx < argc; widx++) { 1356 if (strstr(argv[widx], "--") == argv[widx]) break; 1357 if (strchr(argv[widx], '=') == NULL) break; 1358 wargc++; 1359 } 1360 if (wargc > 0) { 1361 config.cluster_manager_command.weight = weight; 1362 config.cluster_manager_command.weight_argc = wargc; 1363 i += wargc; 1364 } 1365 } else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) { 1366 config.cluster_manager_command.slots = atoi(argv[++i]); 1367 } else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) { 1368 config.cluster_manager_command.timeout = atoi(argv[++i]); 1369 } else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) { 1370 config.cluster_manager_command.pipeline = atoi(argv[++i]); 1371 } else if (!strcmp(argv[i],"--cluster-threshold") && !lastarg) { 1372 config.cluster_manager_command.threshold = atof(argv[++i]); 1373 } else if (!strcmp(argv[i],"--cluster-yes")) { 1374 config.cluster_manager_command.flags |= 1375 CLUSTER_MANAGER_CMD_FLAG_YES; 1376 } else if (!strcmp(argv[i],"--cluster-simulate")) { 1377 config.cluster_manager_command.flags |= 1378 CLUSTER_MANAGER_CMD_FLAG_SIMULATE; 1379 } else if (!strcmp(argv[i],"--cluster-replace")) { 1380 config.cluster_manager_command.flags |= 1381 CLUSTER_MANAGER_CMD_FLAG_REPLACE; 1382 } else if (!strcmp(argv[i],"--cluster-copy")) { 1383 config.cluster_manager_command.flags |= 1384 CLUSTER_MANAGER_CMD_FLAG_COPY; 1385 } else if (!strcmp(argv[i],"--cluster-slave")) { 1386 config.cluster_manager_command.flags |= 1387 CLUSTER_MANAGER_CMD_FLAG_SLAVE; 1388 } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { 1389 config.cluster_manager_command.flags |= 1390 CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; 1391 } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) { 1392 config.cluster_manager_command.flags |= 1393 CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; 1394 } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { 1395 sds version = cliVersion(); 1396 printf("redis-cli %s\n", version); 1397 sdsfree(version); 1398 exit(0); 1399 } else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') { 1400 if (config.cluster_manager_command.argc == 0) { 1401 int j = i + 1; 1402 while (j < argc && argv[j][0] != '-') j++; 1403 int cmd_argc = j - i; 1404 config.cluster_manager_command.argc = cmd_argc; 1405 config.cluster_manager_command.argv = argv + i; 1406 if (cmd_argc > 1) i = j - 1; 1407 } 1408 } else { 1409 if (argv[i][0] == '-') { 1410 fprintf(stderr, 1411 "Unrecognized option or bad number of args for: '%s'\n", 1412 argv[i]); 1413 exit(1); 1414 } else { 1415 /* Likely the command name, stop here. */ 1416 break; 1417 } 1418 } 1419 } 1420 1421 /* --ldb requires --eval. */ 1422 if (config.eval_ldb && config.eval == NULL) { 1423 fprintf(stderr,"Options --ldb and --ldb-sync-mode require --eval.\n"); 1424 fprintf(stderr,"Try %s --help for more information.\n", argv[0]); 1425 exit(1); 1426 } 1427 1428 if (!config.no_auth_warning && config.auth != NULL) { 1429 fputs("Warning: Using a password with '-a' or '-u' option on the command" 1430 " line interface may not be safe.\n", stderr); 1431 } 1432 1433 return i; 1434 } 1435 1436 static void parseEnv() { 1437 /* Set auth from env, but do not overwrite CLI arguments if passed */ 1438 char *auth = getenv(REDIS_CLI_AUTH_ENV); 1439 if (auth != NULL && config.auth == NULL) { 1440 config.auth = auth; 1441 } 1442 1443 char *cluster_yes = getenv(REDIS_CLI_CLUSTER_YES_ENV); 1444 if (cluster_yes != NULL && !strcmp(cluster_yes, "1")) { 1445 config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_YES; 1446 } 1447 } 1448 1449 static sds readArgFromStdin(void) { 1450 char buf[1024]; 1451 sds arg = sdsempty(); 1452 1453 while(1) { 1454 int nread = read(fileno(stdin),buf,1024); 1455 1456 if (nread == 0) break; 1457 else if (nread == -1) { 1458 perror("Reading from standard input"); 1459 exit(1); 1460 } 1461 arg = sdscatlen(arg,buf,nread); 1462 } 1463 return arg; 1464 } 1465 1466 static void usage(void) { 1467 sds version = cliVersion(); 1468 fprintf(stderr, 1469 "redis-cli %s\n" 1470 "\n" 1471 "Usage: redis-cli [OPTIONS] [cmd [arg [arg ...]]]\n" 1472 " -h <hostname> Server hostname (default: 127.0.0.1).\n" 1473 " -p <port> Server port (default: 6379).\n" 1474 " -s <socket> Server socket (overrides hostname and port).\n" 1475 " -a <password> Password to use when connecting to the server.\n" 1476 " You can also use the " REDIS_CLI_AUTH_ENV " environment\n" 1477 " variable to pass this password more safely\n" 1478 " (if both are used, this argument takes predecence).\n" 1479 " -u <uri> Server URI.\n" 1480 " -r <repeat> Execute specified command N times.\n" 1481 " -i <interval> When -r is used, waits <interval> seconds per command.\n" 1482 " It is possible to specify sub-second times like -i 0.1.\n" 1483 " -n <db> Database number.\n" 1484 " -x Read last argument from STDIN.\n" 1485 " -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n).\n" 1486 " -c Enable cluster mode (follow -ASK and -MOVED redirections).\n" 1487 " --raw Use raw formatting for replies (default when STDOUT is\n" 1488 " not a tty).\n" 1489 " --no-raw Force formatted output even when STDOUT is not a tty.\n" 1490 " --csv Output in CSV format.\n" 1491 " --stat Print rolling stats about server: mem, clients, ...\n" 1492 " --latency Enter a special mode continuously sampling latency.\n" 1493 " If you use this mode in an interactive session it runs\n" 1494 " forever displaying real-time stats. Otherwise if --raw or\n" 1495 " --csv is specified, or if you redirect the output to a non\n" 1496 " TTY, it samples the latency for 1 second (you can use\n" 1497 " -i to change the interval), then produces a single output\n" 1498 " and exits.\n" 1499 " --latency-history Like --latency but tracking latency changes over time.\n" 1500 " Default time interval is 15 sec. Change it using -i.\n" 1501 " --latency-dist Shows latency as a spectrum, requires xterm 256 colors.\n" 1502 " Default time interval is 1 sec. Change it using -i.\n" 1503 " --lru-test <keys> Simulate a cache workload with an 80-20 distribution.\n" 1504 " --replica Simulate a replica showing commands received from the master.\n" 1505 " --rdb <filename> Transfer an RDB dump from remote server to local file.\n" 1506 " --pipe Transfer raw Redis protocol from stdin to server.\n" 1507 " --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n" 1508 " no reply is received within <n> seconds.\n" 1509 " Default timeout: %d. Use 0 to wait forever.\n" 1510 " --bigkeys Sample Redis keys looking for keys with many elements (complexity).\n" 1511 " --memkeys Sample Redis keys looking for keys consuming a lot of memory.\n" 1512 " --memkeys-samples <n> Sample Redis keys looking for keys consuming a lot of memory.\n" 1513 " And define number of key elements to sample\n" 1514 " --hotkeys Sample Redis keys looking for hot keys.\n" 1515 " only works when maxmemory-policy is *lfu.\n" 1516 " --scan List all keys using the SCAN command.\n" 1517 " --pattern <pat> Useful with --scan to specify a SCAN pattern.\n" 1518 " --intrinsic-latency <sec> Run a test to measure intrinsic system latency.\n" 1519 " The test will run for the specified amount of seconds.\n" 1520 " --eval <file> Send an EVAL command using the Lua script at <file>.\n" 1521 " --ldb Used with --eval enable the Redis Lua debugger.\n" 1522 " --ldb-sync-mode Like --ldb but uses the synchronous Lua debugger, in\n" 1523 " this mode the server is blocked and script changes are\n" 1524 " not rolled back from the server memory.\n" 1525 " --cluster <command> [args...] [opts...]\n" 1526 " Cluster Manager command and arguments (see below).\n" 1527 " --verbose Verbose mode.\n" 1528 " --no-auth-warning Don't show warning message when using password on command\n" 1529 " line interface.\n" 1530 " --help Output this help and exit.\n" 1531 " --version Output version and exit.\n" 1532 "\n", 1533 version, REDIS_CLI_DEFAULT_PIPE_TIMEOUT); 1534 /* Using another fprintf call to avoid -Woverlength-strings compile warning */ 1535 fprintf(stderr, 1536 "Cluster Manager Commands:\n" 1537 " Use --cluster help to list all available cluster manager commands.\n" 1538 "\n" 1539 "Examples:\n" 1540 " cat /etc/passwd | redis-cli -x set mypasswd\n" 1541 " redis-cli get mypasswd\n" 1542 " redis-cli -r 100 lpush mylist x\n" 1543 " redis-cli -r 100 -i 1 info | grep used_memory_human:\n" 1544 " redis-cli --eval myscript.lua key1 key2 , arg1 arg2 arg3\n" 1545 " redis-cli --scan --pattern '*:12345*'\n" 1546 "\n" 1547 " (Note: when using --eval the comma separates KEYS[] from ARGV[] items)\n" 1548 "\n" 1549 "When no command is given, redis-cli starts in interactive mode.\n" 1550 "Type \"help\" in interactive mode for information on available commands\n" 1551 "and settings.\n" 1552 "\n"); 1553 sdsfree(version); 1554 exit(1); 1555 } 1556 1557 static int confirmWithYes(char *msg) { 1558 if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_YES) { 1559 return 1; 1560 } 1561 printf("%s (type 'yes' to accept): ", msg); 1562 fflush(stdout); 1563 char buf[4]; 1564 int nread = read(fileno(stdin),buf,4); 1565 buf[3] = '\0'; 1566 return (nread != 0 && !strcmp("yes", buf)); 1567 } 1568 1569 /* Turn the plain C strings into Sds strings */ 1570 static char **convertToSds(int count, char** args) { 1571 int j; 1572 char **sds = zmalloc(sizeof(char*)*count); 1573 1574 for(j = 0; j < count; j++) 1575 sds[j] = sdsnew(args[j]); 1576 1577 return sds; 1578 } 1579 1580 static int issueCommandRepeat(int argc, char **argv, long repeat) { 1581 while (1) { 1582 config.cluster_reissue_command = 0; 1583 if (cliSendCommand(argc,argv,repeat) != REDIS_OK) { 1584 cliConnect(CC_FORCE); 1585 1586 /* If we still cannot send the command print error. 1587 * We'll try to reconnect the next time. */ 1588 if (cliSendCommand(argc,argv,repeat) != REDIS_OK) { 1589 cliPrintContextError(); 1590 return REDIS_ERR; 1591 } 1592 } 1593 /* Issue the command again if we got redirected in cluster mode */ 1594 if (config.cluster_mode && config.cluster_reissue_command) { 1595 cliConnect(CC_FORCE); 1596 } else { 1597 break; 1598 } 1599 } 1600 return REDIS_OK; 1601 } 1602 1603 static int issueCommand(int argc, char **argv) { 1604 return issueCommandRepeat(argc, argv, config.repeat); 1605 } 1606 1607 /* Split the user provided command into multiple SDS arguments. 1608 * This function normally uses sdssplitargs() from sds.c which is able 1609 * to understand "quoted strings", escapes and so forth. However when 1610 * we are in Lua debugging mode and the "eval" command is used, we want 1611 * the remaining Lua script (after "e " or "eval ") to be passed verbatim 1612 * as a single big argument. */ 1613 static sds *cliSplitArgs(char *line, int *argc) { 1614 if (config.eval_ldb && (strstr(line,"eval ") == line || 1615 strstr(line,"e ") == line)) 1616 { 1617 sds *argv = sds_malloc(sizeof(sds)*2); 1618 *argc = 2; 1619 int len = strlen(line); 1620 int elen = line[1] == ' ' ? 2 : 5; /* "e " or "eval "? */ 1621 argv[0] = sdsnewlen(line,elen-1); 1622 argv[1] = sdsnewlen(line+elen,len-elen); 1623 return argv; 1624 } else { 1625 return sdssplitargs(line,argc); 1626 } 1627 } 1628 1629 /* Set the CLI preferences. This function is invoked when an interactive 1630 * ":command" is called, or when reading ~/.redisclirc file, in order to 1631 * set user preferences. */ 1632 void cliSetPreferences(char **argv, int argc, int interactive) { 1633 if (!strcasecmp(argv[0],":set") && argc >= 2) { 1634 if (!strcasecmp(argv[1],"hints")) pref.hints = 1; 1635 else if (!strcasecmp(argv[1],"nohints")) pref.hints = 0; 1636 else { 1637 printf("%sunknown redis-cli preference '%s'\n", 1638 interactive ? "" : ".redisclirc: ", 1639 argv[1]); 1640 } 1641 } else { 1642 printf("%sunknown redis-cli internal command '%s'\n", 1643 interactive ? "" : ".redisclirc: ", 1644 argv[0]); 1645 } 1646 } 1647 1648 /* Load the ~/.redisclirc file if any. */ 1649 void cliLoadPreferences(void) { 1650 sds rcfile = getDotfilePath(REDIS_CLI_RCFILE_ENV,REDIS_CLI_RCFILE_DEFAULT); 1651 if (rcfile == NULL) return; 1652 FILE *fp = fopen(rcfile,"r"); 1653 char buf[1024]; 1654 1655 if (fp) { 1656 while(fgets(buf,sizeof(buf),fp) != NULL) { 1657 sds *argv; 1658 int argc; 1659 1660 argv = sdssplitargs(buf,&argc); 1661 if (argc > 0) cliSetPreferences(argv,argc,0); 1662 sdsfreesplitres(argv,argc); 1663 } 1664 fclose(fp); 1665 } 1666 sdsfree(rcfile); 1667 } 1668 1669 static void repl(void) { 1670 sds historyfile = NULL; 1671 int history = 0; 1672 char *line; 1673 int argc; 1674 sds *argv; 1675 1676 /* Initialize the help and, if possible, use the COMMAND command in order 1677 * to retrieve missing entries. */ 1678 cliInitHelp(); 1679 cliIntegrateHelp(); 1680 1681 config.interactive = 1; 1682 linenoiseSetMultiLine(1); 1683 linenoiseSetCompletionCallback(completionCallback); 1684 linenoiseSetHintsCallback(hintsCallback); 1685 linenoiseSetFreeHintsCallback(freeHintsCallback); 1686 1687 /* Only use history and load the rc file when stdin is a tty. */ 1688 if (isatty(fileno(stdin))) { 1689 historyfile = getDotfilePath(REDIS_CLI_HISTFILE_ENV,REDIS_CLI_HISTFILE_DEFAULT); 1690 //keep in-memory history always regardless if history file can be determined 1691 history = 1; 1692 if (historyfile != NULL) { 1693 linenoiseHistoryLoad(historyfile); 1694 } 1695 cliLoadPreferences(); 1696 } 1697 1698 cliRefreshPrompt(); 1699 while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) { 1700 if (line[0] != '\0') { 1701 long repeat = 1; 1702 int skipargs = 0; 1703 char *endptr = NULL; 1704 1705 argv = cliSplitArgs(line,&argc); 1706 1707 /* check if we have a repeat command option and 1708 * need to skip the first arg */ 1709 if (argv && argc > 0) { 1710 errno = 0; 1711 repeat = strtol(argv[0], &endptr, 10); 1712 if (argc > 1 && *endptr == '\0') { 1713 if (errno == ERANGE || errno == EINVAL || repeat <= 0) { 1714 fputs("Invalid redis-cli repeat command option value.\n", stdout); 1715 sdsfreesplitres(argv, argc); 1716 linenoiseFree(line); 1717 continue; 1718 } 1719 skipargs = 1; 1720 } else { 1721 repeat = 1; 1722 } 1723 } 1724 1725 /* Won't save auth command in history file */ 1726 if (!(argv && argc > 0 && !strcasecmp(argv[0+skipargs], "auth"))) { 1727 if (history) linenoiseHistoryAdd(line); 1728 if (historyfile) linenoiseHistorySave(historyfile); 1729 } 1730 1731 if (argv == NULL) { 1732 printf("Invalid argument(s)\n"); 1733 linenoiseFree(line); 1734 continue; 1735 } else if (argc > 0) { 1736 if (strcasecmp(argv[0],"quit") == 0 || 1737 strcasecmp(argv[0],"exit") == 0) 1738 { 1739 exit(0); 1740 } else if (argv[0][0] == ':') { 1741 cliSetPreferences(argv,argc,1); 1742 sdsfreesplitres(argv,argc); 1743 linenoiseFree(line); 1744 continue; 1745 } else if (strcasecmp(argv[0],"restart") == 0) { 1746 if (config.eval) { 1747 config.eval_ldb = 1; 1748 config.output = OUTPUT_RAW; 1749 return; /* Return to evalMode to restart the session. */ 1750 } else { 1751 printf("Use 'restart' only in Lua debugging mode."); 1752 } 1753 } else if (argc == 3 && !strcasecmp(argv[0],"connect")) { 1754 sdsfree(config.hostip); 1755 config.hostip = sdsnew(argv[1]); 1756 config.hostport = atoi(argv[2]); 1757 cliRefreshPrompt(); 1758 cliConnect(CC_FORCE); 1759 } else if (argc == 1 && !strcasecmp(argv[0],"clear")) { 1760 linenoiseClearScreen(); 1761 } else { 1762 long long start_time = mstime(), elapsed; 1763 1764 issueCommandRepeat(argc-skipargs, argv+skipargs, repeat); 1765 1766 /* If our debugging session ended, show the EVAL final 1767 * reply. */ 1768 if (config.eval_ldb_end) { 1769 config.eval_ldb_end = 0; 1770 cliReadReply(0); 1771 printf("\n(Lua debugging session ended%s)\n\n", 1772 config.eval_ldb_sync ? "" : 1773 " -- dataset changes rolled back"); 1774 } 1775 1776 elapsed = mstime()-start_time; 1777 if (elapsed >= 500 && 1778 config.output == OUTPUT_STANDARD) 1779 { 1780 printf("(%.2fs)\n",(double)elapsed/1000); 1781 } 1782 } 1783 } 1784 /* Free the argument vector */ 1785 sdsfreesplitres(argv,argc); 1786 } 1787 /* linenoise() returns malloc-ed lines like readline() */ 1788 linenoiseFree(line); 1789 } 1790 exit(0); 1791 } 1792 1793 static int noninteractive(int argc, char **argv) { 1794 int retval = 0; 1795 if (config.stdinarg) { 1796 argv = zrealloc(argv, (argc+1)*sizeof(char*)); 1797 argv[argc] = readArgFromStdin(); 1798 retval = issueCommand(argc+1, argv); 1799 } else { 1800 retval = issueCommand(argc, argv); 1801 } 1802 return retval; 1803 } 1804 1805 /*------------------------------------------------------------------------------ 1806 * Eval mode 1807 *--------------------------------------------------------------------------- */ 1808 1809 static int evalMode(int argc, char **argv) { 1810 sds script = NULL; 1811 FILE *fp; 1812 char buf[1024]; 1813 size_t nread; 1814 char **argv2; 1815 int j, got_comma, keys; 1816 int retval = REDIS_OK; 1817 1818 while(1) { 1819 if (config.eval_ldb) { 1820 printf( 1821 "Lua debugging session started, please use:\n" 1822 "quit -- End the session.\n" 1823 "restart -- Restart the script in debug mode again.\n" 1824 "help -- Show Lua script debugging commands.\n\n" 1825 ); 1826 } 1827 1828 sdsfree(script); 1829 script = sdsempty(); 1830 got_comma = 0; 1831 keys = 0; 1832 1833 /* Load the script from the file, as an sds string. */ 1834 fp = fopen(config.eval,"r"); 1835 if (!fp) { 1836 fprintf(stderr, 1837 "Can't open file '%s': %s\n", config.eval, strerror(errno)); 1838 exit(1); 1839 } 1840 while((nread = fread(buf,1,sizeof(buf),fp)) != 0) { 1841 script = sdscatlen(script,buf,nread); 1842 } 1843 fclose(fp); 1844 1845 /* If we are debugging a script, enable the Lua debugger. */ 1846 if (config.eval_ldb) { 1847 redisReply *reply = redisCommand(context, 1848 config.eval_ldb_sync ? 1849 "SCRIPT DEBUG sync": "SCRIPT DEBUG yes"); 1850 if (reply) freeReplyObject(reply); 1851 } 1852 1853 /* Create our argument vector */ 1854 argv2 = zmalloc(sizeof(sds)*(argc+3)); 1855 argv2[0] = sdsnew("EVAL"); 1856 argv2[1] = script; 1857 for (j = 0; j < argc; j++) { 1858 if (!got_comma && argv[j][0] == ',' && argv[j][1] == 0) { 1859 got_comma = 1; 1860 continue; 1861 } 1862 argv2[j+3-got_comma] = sdsnew(argv[j]); 1863 if (!got_comma) keys++; 1864 } 1865 argv2[2] = sdscatprintf(sdsempty(),"%d",keys); 1866 1867 /* Call it */ 1868 int eval_ldb = config.eval_ldb; /* Save it, may be reverteed. */ 1869 retval = issueCommand(argc+3-got_comma, argv2); 1870 if (eval_ldb) { 1871 if (!config.eval_ldb) { 1872 /* If the debugging session ended immediately, there was an 1873 * error compiling the script. Show it and they don't enter 1874 * the REPL at all. */ 1875 printf("Eval debugging session can't start:\n"); 1876 cliReadReply(0); 1877 break; /* Return to the caller. */ 1878 } else { 1879 strncpy(config.prompt,"lua debugger> ",sizeof(config.prompt)); 1880 repl(); 1881 /* Restart the session if repl() returned. */ 1882 cliConnect(CC_FORCE); 1883 printf("\n"); 1884 } 1885 } else { 1886 break; /* Return to the caller. */ 1887 } 1888 } 1889 return retval; 1890 } 1891 1892 /*------------------------------------------------------------------------------ 1893 * Cluster Manager 1894 *--------------------------------------------------------------------------- */ 1895 1896 /* The Cluster Manager global structure */ 1897 static struct clusterManager { 1898 list *nodes; /* List of nodes in the configuration. */ 1899 list *errors; 1900 } cluster_manager; 1901 1902 /* Used by clusterManagerFixSlotsCoverage */ 1903 dict *clusterManagerUncoveredSlots = NULL; 1904 1905 typedef struct clusterManagerNode { 1906 redisContext *context; 1907 sds name; 1908 char *ip; 1909 int port; 1910 uint64_t current_epoch; 1911 time_t ping_sent; 1912 time_t ping_recv; 1913 int flags; 1914 list *flags_str; /* Flags string representations */ 1915 sds replicate; /* Master ID if node is a slave */ 1916 int dirty; /* Node has changes that can be flushed */ 1917 uint8_t slots[CLUSTER_MANAGER_SLOTS]; 1918 int slots_count; 1919 int replicas_count; 1920 list *friends; 1921 sds *migrating; /* An array of sds where even strings are slots and odd 1922 * strings are the destination node IDs. */ 1923 sds *importing; /* An array of sds where even strings are slots and odd 1924 * strings are the source node IDs. */ 1925 int migrating_count; /* Length of the migrating array (migrating slots*2) */ 1926 int importing_count; /* Length of the importing array (importing slots*2) */ 1927 float weight; /* Weight used by rebalance */ 1928 int balance; /* Used by rebalance */ 1929 } clusterManagerNode; 1930 1931 /* Data structure used to represent a sequence of cluster nodes. */ 1932 typedef struct clusterManagerNodeArray { 1933 clusterManagerNode **nodes; /* Actual nodes array */ 1934 clusterManagerNode **alloc; /* Pointer to the allocated memory */ 1935 int len; /* Actual length of the array */ 1936 int count; /* Non-NULL nodes count */ 1937 } clusterManagerNodeArray; 1938 1939 /* Used for the reshard table. */ 1940 typedef struct clusterManagerReshardTableItem { 1941 clusterManagerNode *source; 1942 int slot; 1943 } clusterManagerReshardTableItem; 1944 1945 static dictType clusterManagerDictType = { 1946 dictSdsHash, /* hash function */ 1947 NULL, /* key dup */ 1948 NULL, /* val dup */ 1949 dictSdsKeyCompare, /* key compare */ 1950 NULL, /* key destructor */ 1951 dictSdsDestructor /* val destructor */ 1952 }; 1953 1954 typedef int clusterManagerCommandProc(int argc, char **argv); 1955 typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); 1956 1957 /* Cluster Manager helper functions */ 1958 1959 static clusterManagerNode *clusterManagerNewNode(char *ip, int port); 1960 static clusterManagerNode *clusterManagerNodeByName(const char *name); 1961 static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n); 1962 static void clusterManagerNodeResetSlots(clusterManagerNode *node); 1963 static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err); 1964 static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node, 1965 char *err); 1966 static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, 1967 char **err); 1968 static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts); 1969 static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err); 1970 static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes, 1971 int ip_count, clusterManagerNode ***offending, int *offending_len); 1972 static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes, 1973 int ip_count); 1974 static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent); 1975 static void clusterManagerShowNodes(void); 1976 static void clusterManagerShowClusterInfo(void); 1977 static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err); 1978 static void clusterManagerWaitForClusterJoin(void); 1979 static int clusterManagerCheckCluster(int quiet); 1980 static void clusterManagerLog(int level, const char* fmt, ...); 1981 static int clusterManagerIsConfigConsistent(void); 1982 static void clusterManagerOnError(sds err); 1983 static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, 1984 int len); 1985 static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array); 1986 static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, 1987 clusterManagerNode **nodeptr); 1988 static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, 1989 clusterManagerNode *node); 1990 1991 /* Cluster Manager commands. */ 1992 1993 static int clusterManagerCommandCreate(int argc, char **argv); 1994 static int clusterManagerCommandAddNode(int argc, char **argv); 1995 static int clusterManagerCommandDeleteNode(int argc, char **argv); 1996 static int clusterManagerCommandInfo(int argc, char **argv); 1997 static int clusterManagerCommandCheck(int argc, char **argv); 1998 static int clusterManagerCommandFix(int argc, char **argv); 1999 static int clusterManagerCommandReshard(int argc, char **argv); 2000 static int clusterManagerCommandRebalance(int argc, char **argv); 2001 static int clusterManagerCommandSetTimeout(int argc, char **argv); 2002 static int clusterManagerCommandImport(int argc, char **argv); 2003 static int clusterManagerCommandCall(int argc, char **argv); 2004 static int clusterManagerCommandHelp(int argc, char **argv); 2005 2006 typedef struct clusterManagerCommandDef { 2007 char *name; 2008 clusterManagerCommandProc *proc; 2009 int arity; 2010 char *args; 2011 char *options; 2012 } clusterManagerCommandDef; 2013 2014 clusterManagerCommandDef clusterManagerCommands[] = { 2015 {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", 2016 "replicas <arg>"}, 2017 {"check", clusterManagerCommandCheck, -1, "host:port", 2018 "search-multiple-owners"}, 2019 {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, 2020 {"fix", clusterManagerCommandFix, -1, "host:port", 2021 "search-multiple-owners"}, 2022 {"reshard", clusterManagerCommandReshard, -1, "host:port", 2023 "from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>," 2024 "replace"}, 2025 {"rebalance", clusterManagerCommandRebalance, -1, "host:port", 2026 "weight <node1=w1...nodeN=wN>,use-empty-masters," 2027 "timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"}, 2028 {"add-node", clusterManagerCommandAddNode, 2, 2029 "new_host:new_port existing_host:existing_port", "slave,master-id <arg>"}, 2030 {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL}, 2031 {"call", clusterManagerCommandCall, -2, 2032 "host:port command arg arg .. arg", NULL}, 2033 {"set-timeout", clusterManagerCommandSetTimeout, 2, 2034 "host:port milliseconds", NULL}, 2035 {"import", clusterManagerCommandImport, 1, "host:port", 2036 "from <arg>,copy,replace"}, 2037 {"help", clusterManagerCommandHelp, 0, NULL, NULL} 2038 }; 2039 2040 2041 static void createClusterManagerCommand(char *cmdname, int argc, char **argv) { 2042 clusterManagerCommand *cmd = &config.cluster_manager_command; 2043 cmd->name = cmdname; 2044 cmd->argc = argc; 2045 cmd->argv = argc ? argv : NULL; 2046 if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR; 2047 } 2048 2049 2050 static clusterManagerCommandProc *validateClusterManagerCommand(void) { 2051 int i, commands_count = sizeof(clusterManagerCommands) / 2052 sizeof(clusterManagerCommandDef); 2053 clusterManagerCommandProc *proc = NULL; 2054 char *cmdname = config.cluster_manager_command.name; 2055 int argc = config.cluster_manager_command.argc; 2056 for (i = 0; i < commands_count; i++) { 2057 clusterManagerCommandDef cmddef = clusterManagerCommands[i]; 2058 if (!strcmp(cmddef.name, cmdname)) { 2059 if ((cmddef.arity > 0 && argc != cmddef.arity) || 2060 (cmddef.arity < 0 && argc < (cmddef.arity * -1))) { 2061 fprintf(stderr, "[ERR] Wrong number of arguments for " 2062 "specified --cluster sub command\n"); 2063 return NULL; 2064 } 2065 proc = cmddef.proc; 2066 } 2067 } 2068 if (!proc) fprintf(stderr, "Unknown --cluster subcommand\n"); 2069 return proc; 2070 } 2071 2072 /* Get host ip and port from command arguments. If only one argument has 2073 * been provided it must be in the form of 'ip:port', elsewhere 2074 * the first argument must be the ip and the second one the port. 2075 * If host and port can be detected, it returns 1 and it stores host and 2076 * port into variables referenced by'ip_ptr' and 'port_ptr' pointers, 2077 * elsewhere it returns 0. */ 2078 static int getClusterHostFromCmdArgs(int argc, char **argv, 2079 char **ip_ptr, int *port_ptr) { 2080 int port = 0; 2081 char *ip = NULL; 2082 if (argc == 1) { 2083 char *addr = argv[0]; 2084 char *c = strrchr(addr, '@'); 2085 if (c != NULL) *c = '\0'; 2086 c = strrchr(addr, ':'); 2087 if (c != NULL) { 2088 *c = '\0'; 2089 ip = addr; 2090 port = atoi(++c); 2091 } else return 0; 2092 } else { 2093 ip = argv[0]; 2094 port = atoi(argv[1]); 2095 } 2096 if (!ip || !port) return 0; 2097 else { 2098 *ip_ptr = ip; 2099 *port_ptr = port; 2100 } 2101 return 1; 2102 } 2103 2104 static void freeClusterManagerNodeFlags(list *flags) { 2105 listIter li; 2106 listNode *ln; 2107 listRewind(flags, &li); 2108 while ((ln = listNext(&li)) != NULL) { 2109 sds flag = ln->value; 2110 sdsfree(flag); 2111 } 2112 listRelease(flags); 2113 } 2114 2115 static void freeClusterManagerNode(clusterManagerNode *node) { 2116 if (node->context != NULL) redisFree(node->context); 2117 if (node->friends != NULL) { 2118 listIter li; 2119 listNode *ln; 2120 listRewind(node->friends,&li); 2121 while ((ln = listNext(&li)) != NULL) { 2122 clusterManagerNode *fn = ln->value; 2123 freeClusterManagerNode(fn); 2124 } 2125 listRelease(node->friends); 2126 node->friends = NULL; 2127 } 2128 if (node->name != NULL) sdsfree(node->name); 2129 if (node->replicate != NULL) sdsfree(node->replicate); 2130 if ((node->flags & CLUSTER_MANAGER_FLAG_FRIEND) && node->ip) 2131 sdsfree(node->ip); 2132 int i; 2133 if (node->migrating != NULL) { 2134 for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]); 2135 zfree(node->migrating); 2136 } 2137 if (node->importing != NULL) { 2138 for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); 2139 zfree(node->importing); 2140 } 2141 if (node->flags_str != NULL) { 2142 freeClusterManagerNodeFlags(node->flags_str); 2143 node->flags_str = NULL; 2144 } 2145 zfree(node); 2146 } 2147 2148 static void freeClusterManager(void) { 2149 listIter li; 2150 listNode *ln; 2151 if (cluster_manager.nodes != NULL) { 2152 listRewind(cluster_manager.nodes,&li); 2153 while ((ln = listNext(&li)) != NULL) { 2154 clusterManagerNode *n = ln->value; 2155 freeClusterManagerNode(n); 2156 } 2157 listRelease(cluster_manager.nodes); 2158 cluster_manager.nodes = NULL; 2159 } 2160 if (cluster_manager.errors != NULL) { 2161 listRewind(cluster_manager.errors,&li); 2162 while ((ln = listNext(&li)) != NULL) { 2163 sds err = ln->value; 2164 sdsfree(err); 2165 } 2166 listRelease(cluster_manager.errors); 2167 cluster_manager.errors = NULL; 2168 } 2169 if (clusterManagerUncoveredSlots != NULL) 2170 dictRelease(clusterManagerUncoveredSlots); 2171 } 2172 2173 static clusterManagerNode *clusterManagerNewNode(char *ip, int port) { 2174 clusterManagerNode *node = zmalloc(sizeof(*node)); 2175 node->context = NULL; 2176 node->name = NULL; 2177 node->ip = ip; 2178 node->port = port; 2179 node->current_epoch = 0; 2180 node->ping_sent = 0; 2181 node->ping_recv = 0; 2182 node->flags = 0; 2183 node->flags_str = NULL; 2184 node->replicate = NULL; 2185 node->dirty = 0; 2186 node->friends = NULL; 2187 node->migrating = NULL; 2188 node->importing = NULL; 2189 node->migrating_count = 0; 2190 node->importing_count = 0; 2191 node->replicas_count = 0; 2192 node->weight = 1.0f; 2193 node->balance = 0; 2194 clusterManagerNodeResetSlots(node); 2195 return node; 2196 } 2197 2198 /* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the 2199 * latest case, if the 'err' arg is not NULL, it gets allocated with a copy 2200 * of reply error (it's up to the caller function to free it), elsewhere 2201 * the error is directly printed. */ 2202 static int clusterManagerCheckRedisReply(clusterManagerNode *n, 2203 redisReply *r, char **err) 2204 { 2205 int is_err = 0; 2206 if (!r || (is_err = (r->type == REDIS_REPLY_ERROR))) { 2207 if (is_err) { 2208 if (err != NULL) { 2209 *err = zmalloc((r->len + 1) * sizeof(char)); 2210 strcpy(*err, r->str); 2211 } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, r->str); 2212 } 2213 return 0; 2214 } 2215 return 1; 2216 } 2217 2218 /* Execute MULTI command on a cluster node. */ 2219 static int clusterManagerStartTransaction(clusterManagerNode *node) { 2220 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); 2221 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2222 if (reply) freeReplyObject(reply); 2223 return success; 2224 } 2225 2226 /* Execute EXEC command on a cluster node. */ 2227 static int clusterManagerExecTransaction(clusterManagerNode *node, 2228 clusterManagerOnReplyError onerror) 2229 { 2230 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); 2231 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2232 if (success) { 2233 if (reply->type != REDIS_REPLY_ARRAY) { 2234 success = 0; 2235 goto cleanup; 2236 } 2237 size_t i; 2238 for (i = 0; i < reply->elements; i++) { 2239 redisReply *r = reply->element[i]; 2240 char *err = NULL; 2241 success = clusterManagerCheckRedisReply(node, r, &err); 2242 if (!success && onerror) success = onerror(r, i); 2243 if (err) { 2244 if (!success) 2245 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 2246 zfree(err); 2247 } 2248 if (!success) break; 2249 } 2250 } 2251 cleanup: 2252 if (reply) freeReplyObject(reply); 2253 return success; 2254 } 2255 2256 static int clusterManagerNodeConnect(clusterManagerNode *node) { 2257 if (node->context) redisFree(node->context); 2258 node->context = redisConnect(node->ip, node->port); 2259 if (node->context->err) { 2260 fprintf(stderr,"Could not connect to Redis at "); 2261 fprintf(stderr,"%s:%d: %s\n", node->ip, node->port, 2262 node->context->errstr); 2263 redisFree(node->context); 2264 node->context = NULL; 2265 return 0; 2266 } 2267 /* Set aggressive KEEP_ALIVE socket option in the Redis context socket 2268 * in order to prevent timeouts caused by the execution of long 2269 * commands. At the same time this improves the detection of real 2270 * errors. */ 2271 anetKeepAlive(NULL, node->context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); 2272 if (config.auth) { 2273 redisReply *reply = redisCommand(node->context,"AUTH %s",config.auth); 2274 int ok = clusterManagerCheckRedisReply(node, reply, NULL); 2275 if (reply != NULL) freeReplyObject(reply); 2276 if (!ok) return 0; 2277 } 2278 return 1; 2279 } 2280 2281 static void clusterManagerRemoveNodeFromList(list *nodelist, 2282 clusterManagerNode *node) { 2283 listIter li; 2284 listNode *ln; 2285 listRewind(nodelist, &li); 2286 while ((ln = listNext(&li)) != NULL) { 2287 if (node == ln->value) { 2288 listDelNode(nodelist, ln); 2289 break; 2290 } 2291 } 2292 } 2293 2294 /* Return the node with the specified name (ID) or NULL. */ 2295 static clusterManagerNode *clusterManagerNodeByName(const char *name) { 2296 if (cluster_manager.nodes == NULL) return NULL; 2297 clusterManagerNode *found = NULL; 2298 sds lcname = sdsempty(); 2299 lcname = sdscpy(lcname, name); 2300 sdstolower(lcname); 2301 listIter li; 2302 listNode *ln; 2303 listRewind(cluster_manager.nodes, &li); 2304 while ((ln = listNext(&li)) != NULL) { 2305 clusterManagerNode *n = ln->value; 2306 if (n->name && !sdscmp(n->name, lcname)) { 2307 found = n; 2308 break; 2309 } 2310 } 2311 sdsfree(lcname); 2312 return found; 2313 } 2314 2315 /* Like clusterManagerNodeByName but the specified name can be just the first 2316 * part of the node ID as long as the prefix in unique across the 2317 * cluster. 2318 */ 2319 static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char*name) 2320 { 2321 if (cluster_manager.nodes == NULL) return NULL; 2322 clusterManagerNode *found = NULL; 2323 sds lcname = sdsempty(); 2324 lcname = sdscpy(lcname, name); 2325 sdstolower(lcname); 2326 listIter li; 2327 listNode *ln; 2328 listRewind(cluster_manager.nodes, &li); 2329 while ((ln = listNext(&li)) != NULL) { 2330 clusterManagerNode *n = ln->value; 2331 if (n->name && 2332 strstr(n->name, lcname) == n->name) { 2333 found = n; 2334 break; 2335 } 2336 } 2337 sdsfree(lcname); 2338 return found; 2339 } 2340 2341 static void clusterManagerNodeResetSlots(clusterManagerNode *node) { 2342 memset(node->slots, 0, sizeof(node->slots)); 2343 node->slots_count = 0; 2344 } 2345 2346 /* Call "INFO" redis command on the specified node and return the reply. */ 2347 static redisReply *clusterManagerGetNodeRedisInfo(clusterManagerNode *node, 2348 char **err) 2349 { 2350 redisReply *info = CLUSTER_MANAGER_COMMAND(node, "INFO"); 2351 if (err != NULL) *err = NULL; 2352 if (info == NULL) return NULL; 2353 if (info->type == REDIS_REPLY_ERROR) { 2354 if (err != NULL) { 2355 *err = zmalloc((info->len + 1) * sizeof(char)); 2356 strcpy(*err, info->str); 2357 } 2358 freeReplyObject(info); 2359 return NULL; 2360 } 2361 return info; 2362 } 2363 2364 static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) { 2365 redisReply *info = clusterManagerGetNodeRedisInfo(node, err); 2366 if (info == NULL) return 0; 2367 int is_cluster = (int) getLongInfoField(info->str, "cluster_enabled"); 2368 freeReplyObject(info); 2369 return is_cluster; 2370 } 2371 2372 /* Checks whether the node is empty. Node is considered not-empty if it has 2373 * some key or if it already knows other nodes */ 2374 static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) { 2375 redisReply *info = clusterManagerGetNodeRedisInfo(node, err); 2376 int is_empty = 1; 2377 if (info == NULL) return 0; 2378 if (strstr(info->str, "db0:") != NULL) { 2379 is_empty = 0; 2380 goto result; 2381 } 2382 freeReplyObject(info); 2383 info = CLUSTER_MANAGER_COMMAND(node, "CLUSTER INFO"); 2384 if (err != NULL) *err = NULL; 2385 if (!clusterManagerCheckRedisReply(node, info, err)) { 2386 is_empty = 0; 2387 goto result; 2388 } 2389 long known_nodes = getLongInfoField(info->str, "cluster_known_nodes"); 2390 is_empty = (known_nodes == 1); 2391 result: 2392 freeReplyObject(info); 2393 return is_empty; 2394 } 2395 2396 /* Return the anti-affinity score, which is a measure of the amount of 2397 * violations of anti-affinity in the current cluster layout, that is, how 2398 * badly the masters and slaves are distributed in the different IP 2399 * addresses so that slaves of the same master are not in the master 2400 * host and are also in different hosts. 2401 * 2402 * The score is calculated as follows: 2403 * 2404 * SAME_AS_MASTER = 10000 * each slave in the same IP of its master. 2405 * SAME_AS_SLAVE = 1 * each slave having the same IP as another slave 2406 of the same master. 2407 * FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE 2408 * 2409 * So a greater score means a worse anti-affinity level, while zero 2410 * means perfect anti-affinity. 2411 * 2412 * The anti affinity optimizator will try to get a score as low as 2413 * possible. Since we do not want to sacrifice the fact that slaves should 2414 * not be in the same host as the master, we assign 10000 times the score 2415 * to this violation, so that we'll optimize for the second factor only 2416 * if it does not impact the first one. 2417 * 2418 * The ipnodes argument is an array of clusterManagerNodeArray, one for 2419 * each IP, while ip_count is the total number of IPs in the configuration. 2420 * 2421 * The function returns the above score, and the list of 2422 * offending slaves can be stored into the 'offending' argument, 2423 * so that the optimizer can try changing the configuration of the 2424 * slaves violating the anti-affinity goals. */ 2425 static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes, 2426 int ip_count, clusterManagerNode ***offending, int *offending_len) 2427 { 2428 int score = 0, i, j; 2429 int node_len = cluster_manager.nodes->len; 2430 clusterManagerNode **offending_p = NULL; 2431 if (offending != NULL) { 2432 *offending = zcalloc(node_len * sizeof(clusterManagerNode*)); 2433 offending_p = *offending; 2434 } 2435 /* For each set of nodes in the same host, split by 2436 * related nodes (masters and slaves which are involved in 2437 * replication of each other) */ 2438 for (i = 0; i < ip_count; i++) { 2439 clusterManagerNodeArray *node_array = &(ipnodes[i]); 2440 dict *related = dictCreate(&clusterManagerDictType, NULL); 2441 char *ip = NULL; 2442 for (j = 0; j < node_array->len; j++) { 2443 clusterManagerNode *node = node_array->nodes[j]; 2444 if (node == NULL) continue; 2445 if (!ip) ip = node->ip; 2446 sds types; 2447 /* We always use the Master ID as key. */ 2448 sds key = (!node->replicate ? node->name : node->replicate); 2449 assert(key != NULL); 2450 dictEntry *entry = dictFind(related, key); 2451 if (entry) types = sdsdup((sds) dictGetVal(entry)); 2452 else types = sdsempty(); 2453 /* Master type 'm' is always set as the first character of the 2454 * types string. */ 2455 if (!node->replicate) types = sdscatprintf(types, "m%s", types); 2456 else types = sdscat(types, "s"); 2457 dictReplace(related, key, types); 2458 } 2459 /* Now it's trivial to check, for each related group having the 2460 * same host, what is their local score. */ 2461 dictIterator *iter = dictGetIterator(related); 2462 dictEntry *entry; 2463 while ((entry = dictNext(iter)) != NULL) { 2464 sds types = (sds) dictGetVal(entry); 2465 sds name = (sds) dictGetKey(entry); 2466 int typeslen = sdslen(types); 2467 if (typeslen < 2) continue; 2468 if (types[0] == 'm') score += (10000 * (typeslen - 1)); 2469 else score += (1 * typeslen); 2470 if (offending == NULL) continue; 2471 /* Populate the list of offending nodes. */ 2472 listIter li; 2473 listNode *ln; 2474 listRewind(cluster_manager.nodes, &li); 2475 while ((ln = listNext(&li)) != NULL) { 2476 clusterManagerNode *n = ln->value; 2477 if (n->replicate == NULL) continue; 2478 if (!strcmp(n->replicate, name) && !strcmp(n->ip, ip)) { 2479 *(offending_p++) = n; 2480 if (offending_len != NULL) (*offending_len)++; 2481 break; 2482 } 2483 } 2484 } 2485 //if (offending_len != NULL) *offending_len = offending_p - *offending; 2486 dictReleaseIterator(iter); 2487 dictRelease(related); 2488 } 2489 return score; 2490 } 2491 2492 static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes, 2493 int ip_count) 2494 { 2495 clusterManagerNode **offenders = NULL; 2496 int score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count, 2497 NULL, NULL); 2498 if (score == 0) goto cleanup; 2499 clusterManagerLogInfo(">>> Trying to optimize slaves allocation " 2500 "for anti-affinity\n"); 2501 int node_len = cluster_manager.nodes->len; 2502 int maxiter = 500 * node_len; // Effort is proportional to cluster size... 2503 srand(time(NULL)); 2504 while (maxiter > 0) { 2505 int offending_len = 0; 2506 if (offenders != NULL) { 2507 zfree(offenders); 2508 offenders = NULL; 2509 } 2510 score = clusterManagerGetAntiAffinityScore(ipnodes, 2511 ip_count, 2512 &offenders, 2513 &offending_len); 2514 if (score == 0) break; // Optimal anti affinity reached 2515 /* We'll try to randomly swap a slave's assigned master causing 2516 * an affinity problem with another random slave, to see if we 2517 * can improve the affinity. */ 2518 int rand_idx = rand() % offending_len; 2519 clusterManagerNode *first = offenders[rand_idx], 2520 *second = NULL; 2521 clusterManagerNode **other_replicas = zcalloc((node_len - 1) * 2522 sizeof(*other_replicas)); 2523 int other_replicas_count = 0; 2524 listIter li; 2525 listNode *ln; 2526 listRewind(cluster_manager.nodes, &li); 2527 while ((ln = listNext(&li)) != NULL) { 2528 clusterManagerNode *n = ln->value; 2529 if (n != first && n->replicate != NULL) 2530 other_replicas[other_replicas_count++] = n; 2531 } 2532 if (other_replicas_count == 0) { 2533 zfree(other_replicas); 2534 break; 2535 } 2536 rand_idx = rand() % other_replicas_count; 2537 second = other_replicas[rand_idx]; 2538 char *first_master = first->replicate, 2539 *second_master = second->replicate; 2540 first->replicate = second_master, first->dirty = 1; 2541 second->replicate = first_master, second->dirty = 1; 2542 int new_score = clusterManagerGetAntiAffinityScore(ipnodes, 2543 ip_count, 2544 NULL, NULL); 2545 /* If the change actually makes thing worse, revert. Otherwise 2546 * leave as it is because the best solution may need a few 2547 * combined swaps. */ 2548 if (new_score > score) { 2549 first->replicate = first_master; 2550 second->replicate = second_master; 2551 } 2552 zfree(other_replicas); 2553 maxiter--; 2554 } 2555 score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count, NULL, NULL); 2556 char *msg; 2557 int perfect = (score == 0); 2558 int log_level = (perfect ? CLUSTER_MANAGER_LOG_LVL_SUCCESS : 2559 CLUSTER_MANAGER_LOG_LVL_WARN); 2560 if (perfect) msg = "[OK] Perfect anti-affinity obtained!"; 2561 else if (score >= 10000) 2562 msg = ("[WARNING] Some slaves are in the same host as their master"); 2563 else 2564 msg=("[WARNING] Some slaves of the same master are in the same host"); 2565 clusterManagerLog(log_level, "%s\n", msg); 2566 cleanup: 2567 zfree(offenders); 2568 } 2569 2570 /* Return a representable string of the node's flags */ 2571 static sds clusterManagerNodeFlagString(clusterManagerNode *node) { 2572 sds flags = sdsempty(); 2573 if (!node->flags_str) return flags; 2574 int empty = 1; 2575 listIter li; 2576 listNode *ln; 2577 listRewind(node->flags_str, &li); 2578 while ((ln = listNext(&li)) != NULL) { 2579 sds flag = ln->value; 2580 if (strcmp(flag, "myself") == 0) continue; 2581 if (!empty) flags = sdscat(flags, ","); 2582 flags = sdscatfmt(flags, "%S", flag); 2583 empty = 0; 2584 } 2585 return flags; 2586 } 2587 2588 /* Return a representable string of the node's slots */ 2589 static sds clusterManagerNodeSlotsString(clusterManagerNode *node) { 2590 sds slots = sdsempty(); 2591 int first_range_idx = -1, last_slot_idx = -1, i; 2592 for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) { 2593 int has_slot = node->slots[i]; 2594 if (has_slot) { 2595 if (first_range_idx == -1) { 2596 if (sdslen(slots)) slots = sdscat(slots, ","); 2597 first_range_idx = i; 2598 slots = sdscatfmt(slots, "[%u", i); 2599 } 2600 last_slot_idx = i; 2601 } else { 2602 if (last_slot_idx >= 0) { 2603 if (first_range_idx == last_slot_idx) 2604 slots = sdscat(slots, "]"); 2605 else slots = sdscatfmt(slots, "-%u]", last_slot_idx); 2606 } 2607 last_slot_idx = -1; 2608 first_range_idx = -1; 2609 } 2610 } 2611 if (last_slot_idx >= 0) { 2612 if (first_range_idx == last_slot_idx) slots = sdscat(slots, "]"); 2613 else slots = sdscatfmt(slots, "-%u]", last_slot_idx); 2614 } 2615 return slots; 2616 } 2617 2618 /* ----------------------------------------------------------------------------- 2619 * Key space handling 2620 * -------------------------------------------------------------------------- */ 2621 2622 /* We have 16384 hash slots. The hash slot of a given key is obtained 2623 * as the least significant 14 bits of the crc16 of the key. 2624 * 2625 * However if the key contains the {...} pattern, only the part between 2626 * { and } is hashed. This may be useful in the future to force certain 2627 * keys to be in the same node (assuming no resharding is in progress). */ 2628 static unsigned int clusterManagerKeyHashSlot(char *key, int keylen) { 2629 int s, e; /* start-end indexes of { and } */ 2630 2631 for (s = 0; s < keylen; s++) 2632 if (key[s] == '{') break; 2633 2634 /* No '{' ? Hash the whole key. This is the base case. */ 2635 if (s == keylen) return crc16(key,keylen) & 0x3FFF; 2636 2637 /* '{' found? Check if we have the corresponding '}'. */ 2638 for (e = s+1; e < keylen; e++) 2639 if (key[e] == '}') break; 2640 2641 /* No '}' or nothing between {} ? Hash the whole key. */ 2642 if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF; 2643 2644 /* If we are here there is both a { and a } on its right. Hash 2645 * what is in the middle between { and }. */ 2646 return crc16(key+s+1,e-s-1) & 0x3FFF; 2647 } 2648 2649 /* Return a string representation of the cluster node. */ 2650 static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) { 2651 sds info = sdsempty(); 2652 sds spaces = sdsempty(); 2653 int i; 2654 for (i = 0; i < indent; i++) spaces = sdscat(spaces, " "); 2655 if (indent) info = sdscat(info, spaces); 2656 int is_master = !(node->flags & CLUSTER_MANAGER_FLAG_SLAVE); 2657 char *role = (is_master ? "M" : "S"); 2658 sds slots = NULL; 2659 if (node->dirty && node->replicate != NULL) 2660 info = sdscatfmt(info, "S: %S %s:%u", node->name, node->ip, node->port); 2661 else { 2662 slots = clusterManagerNodeSlotsString(node); 2663 sds flags = clusterManagerNodeFlagString(node); 2664 info = sdscatfmt(info, "%s: %S %s:%u\n" 2665 "%s slots:%S (%u slots) " 2666 "%S", 2667 role, node->name, node->ip, node->port, spaces, 2668 slots, node->slots_count, flags); 2669 sdsfree(slots); 2670 sdsfree(flags); 2671 } 2672 if (node->replicate != NULL) 2673 info = sdscatfmt(info, "\n%s replicates %S", spaces, node->replicate); 2674 else if (node->replicas_count) 2675 info = sdscatfmt(info, "\n%s %U additional replica(s)", 2676 spaces, node->replicas_count); 2677 sdsfree(spaces); 2678 return info; 2679 } 2680 2681 static void clusterManagerShowNodes(void) { 2682 listIter li; 2683 listNode *ln; 2684 listRewind(cluster_manager.nodes, &li); 2685 while ((ln = listNext(&li)) != NULL) { 2686 clusterManagerNode *node = ln->value; 2687 sds info = clusterManagerNodeInfo(node, 0); 2688 printf("%s\n", (char *) info); 2689 sdsfree(info); 2690 } 2691 } 2692 2693 static void clusterManagerShowClusterInfo(void) { 2694 int masters = 0; 2695 int keys = 0; 2696 listIter li; 2697 listNode *ln; 2698 listRewind(cluster_manager.nodes, &li); 2699 while ((ln = listNext(&li)) != NULL) { 2700 clusterManagerNode *node = ln->value; 2701 if (!(node->flags & CLUSTER_MANAGER_FLAG_SLAVE)) { 2702 if (!node->name) continue; 2703 int replicas = 0; 2704 int dbsize = -1; 2705 char name[9]; 2706 memcpy(name, node->name, 8); 2707 name[8] = '\0'; 2708 listIter ri; 2709 listNode *rn; 2710 listRewind(cluster_manager.nodes, &ri); 2711 while ((rn = listNext(&ri)) != NULL) { 2712 clusterManagerNode *n = rn->value; 2713 if (n == node || !(n->flags & CLUSTER_MANAGER_FLAG_SLAVE)) 2714 continue; 2715 if (n->replicate && !strcmp(n->replicate, node->name)) 2716 replicas++; 2717 } 2718 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE"); 2719 if (reply != NULL || reply->type == REDIS_REPLY_INTEGER) 2720 dbsize = reply->integer; 2721 if (dbsize < 0) { 2722 char *err = ""; 2723 if (reply != NULL && reply->type == REDIS_REPLY_ERROR) 2724 err = reply->str; 2725 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 2726 if (reply != NULL) freeReplyObject(reply); 2727 return; 2728 }; 2729 if (reply != NULL) freeReplyObject(reply); 2730 printf("%s:%d (%s...) -> %d keys | %d slots | %d slaves.\n", 2731 node->ip, node->port, name, dbsize, 2732 node->slots_count, replicas); 2733 masters++; 2734 keys += dbsize; 2735 } 2736 } 2737 clusterManagerLogOk("[OK] %d keys in %d masters.\n", keys, masters); 2738 float keys_per_slot = keys / (float) CLUSTER_MANAGER_SLOTS; 2739 printf("%.2f keys per slot on average.\n", keys_per_slot); 2740 } 2741 2742 /* Flush dirty slots configuration of the node by calling CLUSTER ADDSLOTS */ 2743 static int clusterManagerAddSlots(clusterManagerNode *node, char**err) 2744 { 2745 redisReply *reply = NULL; 2746 void *_reply = NULL; 2747 int success = 1; 2748 /* First two args are used for the command itself. */ 2749 int argc = node->slots_count + 2; 2750 sds *argv = zmalloc(argc * sizeof(*argv)); 2751 size_t *argvlen = zmalloc(argc * sizeof(*argvlen)); 2752 argv[0] = "CLUSTER"; 2753 argv[1] = "ADDSLOTS"; 2754 argvlen[0] = 7; 2755 argvlen[1] = 8; 2756 *err = NULL; 2757 int i, argv_idx = 2; 2758 for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) { 2759 if (argv_idx >= argc) break; 2760 if (node->slots[i]) { 2761 argv[argv_idx] = sdsfromlonglong((long long) i); 2762 argvlen[argv_idx] = sdslen(argv[argv_idx]); 2763 argv_idx++; 2764 } 2765 } 2766 if (!argv_idx) { 2767 success = 0; 2768 goto cleanup; 2769 } 2770 redisAppendCommandArgv(node->context,argc,(const char**)argv,argvlen); 2771 if (redisGetReply(node->context, &_reply) != REDIS_OK) { 2772 success = 0; 2773 goto cleanup; 2774 } 2775 reply = (redisReply*) _reply; 2776 success = clusterManagerCheckRedisReply(node, reply, err); 2777 cleanup: 2778 zfree(argvlen); 2779 if (argv != NULL) { 2780 for (i = 2; i < argc; i++) sdsfree(argv[i]); 2781 zfree(argv); 2782 } 2783 if (reply != NULL) freeReplyObject(reply); 2784 return success; 2785 } 2786 2787 /* Set slot status to "importing" or "migrating" */ 2788 static int clusterManagerSetSlot(clusterManagerNode *node1, 2789 clusterManagerNode *node2, 2790 int slot, const char *status, char **err) { 2791 redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER " 2792 "SETSLOT %d %s %s", 2793 slot, status, 2794 (char *) node2->name); 2795 if (err != NULL) *err = NULL; 2796 if (!reply) return 0; 2797 int success = 1; 2798 if (reply->type == REDIS_REPLY_ERROR) { 2799 success = 0; 2800 if (err != NULL) { 2801 *err = zmalloc((reply->len + 1) * sizeof(char)); 2802 strcpy(*err, reply->str); 2803 } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str); 2804 goto cleanup; 2805 } 2806 cleanup: 2807 freeReplyObject(reply); 2808 return success; 2809 } 2810 2811 static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { 2812 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, 2813 "CLUSTER SETSLOT %d %s", slot, "STABLE"); 2814 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2815 if (reply) freeReplyObject(reply); 2816 return success; 2817 } 2818 2819 static int clusterManagerDelSlot(clusterManagerNode *node, int slot, 2820 int ignore_unassigned_err) 2821 { 2822 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, 2823 "CLUSTER DELSLOTS %d", slot); 2824 char *err = NULL; 2825 int success = clusterManagerCheckRedisReply(node, reply, &err); 2826 if (!success && reply && reply->type == REDIS_REPLY_ERROR && 2827 ignore_unassigned_err && 2828 strstr(reply->str, "already unassigned") != NULL) success = 1; 2829 if (!success && err != NULL) { 2830 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 2831 zfree(err); 2832 } 2833 if (reply) freeReplyObject(reply); 2834 return success; 2835 } 2836 2837 static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { 2838 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, 2839 "CLUSTER ADDSLOTS %d", slot); 2840 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2841 if (reply) freeReplyObject(reply); 2842 return success; 2843 } 2844 2845 static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, 2846 int slot) 2847 { 2848 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, 2849 "CLUSTER COUNTKEYSINSLOT %d", slot); 2850 int count = -1; 2851 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2852 if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; 2853 if (reply) freeReplyObject(reply); 2854 return count; 2855 } 2856 2857 static int clusterManagerBumpEpoch(clusterManagerNode *node) { 2858 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); 2859 int success = clusterManagerCheckRedisReply(node, reply, NULL); 2860 if (reply) freeReplyObject(reply); 2861 return success; 2862 } 2863 2864 static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { 2865 if (bulk_idx == 0 && reply) { 2866 if (reply->type == REDIS_REPLY_ERROR) 2867 return strstr(reply->str, "already unassigned") != NULL; 2868 } 2869 return 0; 2870 } 2871 2872 static int clusterManagerSetSlotOwner(clusterManagerNode *owner, 2873 int slot, 2874 int do_clear) 2875 { 2876 int success = clusterManagerStartTransaction(owner); 2877 if (!success) return 0; 2878 /* Ensure the slot is not already assigned. */ 2879 clusterManagerDelSlot(owner, slot, 1); 2880 /* Add the slot and bump epoch. */ 2881 clusterManagerAddSlot(owner, slot); 2882 if (do_clear) clusterManagerClearSlotStatus(owner, slot); 2883 clusterManagerBumpEpoch(owner); 2884 success = clusterManagerExecTransaction(owner, 2885 clusterManagerIgnoreUnassignedErr); 2886 return success; 2887 } 2888 2889 /* Migrate keys taken from reply->elements. It returns the reply from the 2890 * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' 2891 * is not NULL, a dot will be printed for every migrated key. */ 2892 static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source, 2893 clusterManagerNode *target, 2894 redisReply *reply, 2895 int replace, int timeout, 2896 char *dots) 2897 { 2898 redisReply *migrate_reply = NULL; 2899 char **argv = NULL; 2900 size_t *argv_len = NULL; 2901 int c = (replace ? 8 : 7); 2902 if (config.auth) c += 2; 2903 size_t argc = c + reply->elements; 2904 size_t i, offset = 6; // Keys Offset 2905 argv = zcalloc(argc * sizeof(char *)); 2906 argv_len = zcalloc(argc * sizeof(size_t)); 2907 char portstr[255]; 2908 char timeoutstr[255]; 2909 snprintf(portstr, 10, "%d", target->port); 2910 snprintf(timeoutstr, 10, "%d", timeout); 2911 argv[0] = "MIGRATE"; 2912 argv_len[0] = 7; 2913 argv[1] = target->ip; 2914 argv_len[1] = strlen(target->ip); 2915 argv[2] = portstr; 2916 argv_len[2] = strlen(portstr); 2917 argv[3] = ""; 2918 argv_len[3] = 0; 2919 argv[4] = "0"; 2920 argv_len[4] = 1; 2921 argv[5] = timeoutstr; 2922 argv_len[5] = strlen(timeoutstr); 2923 if (replace) { 2924 argv[offset] = "REPLACE"; 2925 argv_len[offset] = 7; 2926 offset++; 2927 } 2928 if (config.auth) { 2929 argv[offset] = "AUTH"; 2930 argv_len[offset] = 4; 2931 offset++; 2932 argv[offset] = config.auth; 2933 argv_len[offset] = strlen(config.auth); 2934 offset++; 2935 } 2936 argv[offset] = "KEYS"; 2937 argv_len[offset] = 4; 2938 offset++; 2939 for (i = 0; i < reply->elements; i++) { 2940 redisReply *entry = reply->element[i]; 2941 size_t idx = i + offset; 2942 assert(entry->type == REDIS_REPLY_STRING); 2943 argv[idx] = (char *) sdsnew(entry->str); 2944 argv_len[idx] = entry->len; 2945 if (dots) dots[i] = '.'; 2946 } 2947 if (dots) dots[reply->elements] = '\0'; 2948 void *_reply = NULL; 2949 redisAppendCommandArgv(source->context,argc, 2950 (const char**)argv,argv_len); 2951 int success = (redisGetReply(source->context, &_reply) == REDIS_OK); 2952 for (i = 0; i < reply->elements; i++) sdsfree(argv[i + offset]); 2953 if (!success) goto cleanup; 2954 migrate_reply = (redisReply *) _reply; 2955 cleanup: 2956 zfree(argv); 2957 zfree(argv_len); 2958 return migrate_reply; 2959 } 2960 2961 /* Migrate all keys in the given slot from source to target.*/ 2962 static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, 2963 clusterManagerNode *target, 2964 int slot, int timeout, 2965 int pipeline, int verbose, 2966 char **err) 2967 { 2968 int success = 1; 2969 int replace_existing_keys = (config.cluster_manager_command.flags & 2970 (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); 2971 while (1) { 2972 char *dots = NULL; 2973 redisReply *reply = NULL, *migrate_reply = NULL; 2974 reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER " 2975 "GETKEYSINSLOT %d %d", slot, 2976 pipeline); 2977 success = (reply != NULL); 2978 if (!success) return 0; 2979 if (reply->type == REDIS_REPLY_ERROR) { 2980 success = 0; 2981 if (err != NULL) { 2982 *err = zmalloc((reply->len + 1) * sizeof(char)); 2983 strcpy(*err, reply->str); 2984 CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err); 2985 } 2986 goto next; 2987 } 2988 assert(reply->type == REDIS_REPLY_ARRAY); 2989 size_t count = reply->elements; 2990 if (count == 0) { 2991 freeReplyObject(reply); 2992 break; 2993 } 2994 if (verbose) dots = zmalloc((count+1) * sizeof(char)); 2995 /* Calling MIGRATE command. */ 2996 migrate_reply = clusterManagerMigrateKeysInReply(source, target, 2997 reply, 0, timeout, 2998 dots); 2999 if (migrate_reply == NULL) goto next; 3000 if (migrate_reply->type == REDIS_REPLY_ERROR) { 3001 int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; 3002 int not_served = strstr(migrate_reply->str, "slot not served") != NULL; 3003 if (replace_existing_keys && (is_busy || not_served)) { 3004 /* If the key already exists, try to migrate keys 3005 * adding REPLACE option. 3006 * If the key's slot is not served, try to assign slot 3007 * to the target node. */ 3008 if (not_served) 3009 clusterManagerSetSlot(source, target, slot, "node", NULL); 3010 clusterManagerLogWarn("*** Target key exists. " 3011 "Replacing it for FIX.\n"); 3012 freeReplyObject(migrate_reply); 3013 migrate_reply = clusterManagerMigrateKeysInReply(source, 3014 target, 3015 reply, 3016 is_busy, 3017 timeout, 3018 NULL); 3019 success = (migrate_reply != NULL && 3020 migrate_reply->type != REDIS_REPLY_ERROR); 3021 } else success = 0; 3022 if (!success) { 3023 if (migrate_reply != NULL) { 3024 if (err) { 3025 *err = zmalloc((migrate_reply->len + 1) * sizeof(char)); 3026 strcpy(*err, migrate_reply->str); 3027 } 3028 printf("\n"); 3029 CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, 3030 migrate_reply->str); 3031 } 3032 goto next; 3033 } 3034 } 3035 if (verbose) { 3036 printf("%s", dots); 3037 fflush(stdout); 3038 } 3039 next: 3040 if (reply != NULL) freeReplyObject(reply); 3041 if (migrate_reply != NULL) freeReplyObject(migrate_reply); 3042 if (dots) zfree(dots); 3043 if (!success) break; 3044 } 3045 return success; 3046 } 3047 3048 /* Move slots between source and target nodes using MIGRATE. 3049 * 3050 * Options: 3051 * CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key. 3052 * CLUSTER_MANAGER_OPT_COLD -- Move keys without opening slots / 3053 * reconfiguring the nodes. 3054 * CLUSTER_MANAGER_OPT_UPDATE -- Update node->slots for source/target nodes. 3055 * CLUSTER_MANAGER_OPT_QUIET -- Don't print info messages. 3056 */ 3057 static int clusterManagerMoveSlot(clusterManagerNode *source, 3058 clusterManagerNode *target, 3059 int slot, int opts, char**err) 3060 { 3061 if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) { 3062 printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip, 3063 source->port, target->ip, target->port); 3064 fflush(stdout); 3065 } 3066 if (err != NULL) *err = NULL; 3067 int pipeline = config.cluster_manager_command.pipeline, 3068 timeout = config.cluster_manager_command.timeout, 3069 print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE), 3070 option_cold = (opts & CLUSTER_MANAGER_OPT_COLD), 3071 success = 1; 3072 if (!option_cold) { 3073 success = clusterManagerSetSlot(target, source, slot, 3074 "importing", err); 3075 if (!success) return 0; 3076 success = clusterManagerSetSlot(source, target, slot, 3077 "migrating", err); 3078 if (!success) return 0; 3079 } 3080 success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout, 3081 pipeline, print_dots, err); 3082 if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n"); 3083 if (!success) return 0; 3084 /* Set the new node as the owner of the slot in all the known nodes. */ 3085 if (!option_cold) { 3086 listIter li; 3087 listNode *ln; 3088 listRewind(cluster_manager.nodes, &li); 3089 while ((ln = listNext(&li)) != NULL) { 3090 clusterManagerNode *n = ln->value; 3091 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3092 redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER " 3093 "SETSLOT %d %s %s", 3094 slot, "node", 3095 target->name); 3096 success = (r != NULL); 3097 if (!success) return 0; 3098 if (r->type == REDIS_REPLY_ERROR) { 3099 success = 0; 3100 if (err != NULL) { 3101 *err = zmalloc((r->len + 1) * sizeof(char)); 3102 strcpy(*err, r->str); 3103 CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err); 3104 } 3105 } 3106 freeReplyObject(r); 3107 if (!success) return 0; 3108 } 3109 } 3110 /* Update the node logical config */ 3111 if (opts & CLUSTER_MANAGER_OPT_UPDATE) { 3112 source->slots[slot] = 0; 3113 target->slots[slot] = 1; 3114 } 3115 return 1; 3116 } 3117 3118 /* Flush the dirty node configuration by calling replicate for slaves or 3119 * adding the slots defined in the masters. */ 3120 static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) { 3121 if (!node->dirty) return 0; 3122 redisReply *reply = NULL; 3123 int is_err = 0, success = 1; 3124 if (err != NULL) *err = NULL; 3125 if (node->replicate != NULL) { 3126 reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER REPLICATE %s", 3127 node->replicate); 3128 if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) { 3129 if (is_err && err != NULL) { 3130 *err = zmalloc((reply->len + 1) * sizeof(char)); 3131 strcpy(*err, reply->str); 3132 } 3133 success = 0; 3134 /* If the cluster did not already joined it is possible that 3135 * the slave does not know the master node yet. So on errors 3136 * we return ASAP leaving the dirty flag set, to flush the 3137 * config later. */ 3138 goto cleanup; 3139 } 3140 } else { 3141 int added = clusterManagerAddSlots(node, err); 3142 if (!added || *err != NULL) success = 0; 3143 } 3144 node->dirty = 0; 3145 cleanup: 3146 if (reply != NULL) freeReplyObject(reply); 3147 return success; 3148 } 3149 3150 /* Wait until the cluster configuration is consistent. */ 3151 static void clusterManagerWaitForClusterJoin(void) { 3152 printf("Waiting for the cluster to join\n"); 3153 while(!clusterManagerIsConfigConsistent()) { 3154 printf("."); 3155 fflush(stdout); 3156 sleep(1); 3157 } 3158 printf("\n"); 3159 } 3160 3161 /* Load node's cluster configuration by calling "CLUSTER NODES" command. 3162 * Node's configuration (name, replicate, slots, ...) is then updated. 3163 * If CLUSTER_MANAGER_OPT_GETFRIENDS flag is set into 'opts' argument, 3164 * and node already knows other nodes, the node's friends list is populated 3165 * with the other nodes info. */ 3166 static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, 3167 char **err) 3168 { 3169 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); 3170 int success = 1; 3171 *err = NULL; 3172 if (!clusterManagerCheckRedisReply(node, reply, err)) { 3173 success = 0; 3174 goto cleanup; 3175 } 3176 int getfriends = (opts & CLUSTER_MANAGER_OPT_GETFRIENDS); 3177 char *lines = reply->str, *p, *line; 3178 while ((p = strstr(lines, "\n")) != NULL) { 3179 *p = '\0'; 3180 line = lines; 3181 lines = p + 1; 3182 char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL, 3183 *ping_sent = NULL, *ping_recv = NULL, *config_epoch = NULL, 3184 *link_status = NULL; 3185 UNUSED(link_status); 3186 int i = 0; 3187 while ((p = strchr(line, ' ')) != NULL) { 3188 *p = '\0'; 3189 char *token = line; 3190 line = p + 1; 3191 switch(i++){ 3192 case 0: name = token; break; 3193 case 1: addr = token; break; 3194 case 2: flags = token; break; 3195 case 3: master_id = token; break; 3196 case 4: ping_sent = token; break; 3197 case 5: ping_recv = token; break; 3198 case 6: config_epoch = token; break; 3199 case 7: link_status = token; break; 3200 } 3201 if (i == 8) break; // Slots 3202 } 3203 if (!flags) { 3204 success = 0; 3205 goto cleanup; 3206 } 3207 int myself = (strstr(flags, "myself") != NULL); 3208 clusterManagerNode *currentNode = NULL; 3209 if (myself) { 3210 node->flags |= CLUSTER_MANAGER_FLAG_MYSELF; 3211 currentNode = node; 3212 clusterManagerNodeResetSlots(node); 3213 if (i == 8) { 3214 int remaining = strlen(line); 3215 while (remaining > 0) { 3216 p = strchr(line, ' '); 3217 if (p == NULL) p = line + remaining; 3218 remaining -= (p - line); 3219 3220 char *slotsdef = line; 3221 *p = '\0'; 3222 if (remaining) { 3223 line = p + 1; 3224 remaining--; 3225 } else line = p; 3226 char *dash = NULL; 3227 if (slotsdef[0] == '[') { 3228 slotsdef++; 3229 if ((p = strstr(slotsdef, "->-"))) { // Migrating 3230 *p = '\0'; 3231 p += 3; 3232 char *closing_bracket = strchr(p, ']'); 3233 if (closing_bracket) *closing_bracket = '\0'; 3234 sds slot = sdsnew(slotsdef); 3235 sds dst = sdsnew(p); 3236 node->migrating_count += 2; 3237 node->migrating = zrealloc(node->migrating, 3238 (node->migrating_count * sizeof(sds))); 3239 node->migrating[node->migrating_count - 2] = 3240 slot; 3241 node->migrating[node->migrating_count - 1] = 3242 dst; 3243 } else if ((p = strstr(slotsdef, "-<-"))) {//Importing 3244 *p = '\0'; 3245 p += 3; 3246 char *closing_bracket = strchr(p, ']'); 3247 if (closing_bracket) *closing_bracket = '\0'; 3248 sds slot = sdsnew(slotsdef); 3249 sds src = sdsnew(p); 3250 node->importing_count += 2; 3251 node->importing = zrealloc(node->importing, 3252 (node->importing_count * sizeof(sds))); 3253 node->importing[node->importing_count - 2] = 3254 slot; 3255 node->importing[node->importing_count - 1] = 3256 src; 3257 } 3258 } else if ((dash = strchr(slotsdef, '-')) != NULL) { 3259 p = dash; 3260 int start, stop; 3261 *p = '\0'; 3262 start = atoi(slotsdef); 3263 stop = atoi(p + 1); 3264 node->slots_count += (stop - (start - 1)); 3265 while (start <= stop) node->slots[start++] = 1; 3266 } else if (p > slotsdef) { 3267 node->slots[atoi(slotsdef)] = 1; 3268 node->slots_count++; 3269 } 3270 } 3271 } 3272 node->dirty = 0; 3273 } else if (!getfriends) { 3274 if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue; 3275 else break; 3276 } else { 3277 if (addr == NULL) { 3278 fprintf(stderr, "Error: invalid CLUSTER NODES reply\n"); 3279 success = 0; 3280 goto cleanup; 3281 } 3282 char *c = strrchr(addr, '@'); 3283 if (c != NULL) *c = '\0'; 3284 c = strrchr(addr, ':'); 3285 if (c == NULL) { 3286 fprintf(stderr, "Error: invalid CLUSTER NODES reply\n"); 3287 success = 0; 3288 goto cleanup; 3289 } 3290 *c = '\0'; 3291 int port = atoi(++c); 3292 currentNode = clusterManagerNewNode(sdsnew(addr), port); 3293 currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND; 3294 if (node->friends == NULL) node->friends = listCreate(); 3295 listAddNodeTail(node->friends, currentNode); 3296 } 3297 if (name != NULL) { 3298 if (currentNode->name) sdsfree(currentNode->name); 3299 currentNode->name = sdsnew(name); 3300 } 3301 if (currentNode->flags_str != NULL) 3302 freeClusterManagerNodeFlags(currentNode->flags_str); 3303 currentNode->flags_str = listCreate(); 3304 int flag_len; 3305 while ((flag_len = strlen(flags)) > 0) { 3306 sds flag = NULL; 3307 char *fp = strchr(flags, ','); 3308 if (fp) { 3309 *fp = '\0'; 3310 flag = sdsnew(flags); 3311 flags = fp + 1; 3312 } else { 3313 flag = sdsnew(flags); 3314 flags += flag_len; 3315 } 3316 if (strcmp(flag, "noaddr") == 0) 3317 currentNode->flags |= CLUSTER_MANAGER_FLAG_NOADDR; 3318 else if (strcmp(flag, "disconnected") == 0) 3319 currentNode->flags |= CLUSTER_MANAGER_FLAG_DISCONNECT; 3320 else if (strcmp(flag, "fail") == 0) 3321 currentNode->flags |= CLUSTER_MANAGER_FLAG_FAIL; 3322 else if (strcmp(flag, "slave") == 0) { 3323 currentNode->flags |= CLUSTER_MANAGER_FLAG_SLAVE; 3324 if (master_id != NULL) { 3325 if (currentNode->replicate) sdsfree(currentNode->replicate); 3326 currentNode->replicate = sdsnew(master_id); 3327 } 3328 } 3329 listAddNodeTail(currentNode->flags_str, flag); 3330 } 3331 if (config_epoch != NULL) 3332 currentNode->current_epoch = atoll(config_epoch); 3333 if (ping_sent != NULL) currentNode->ping_sent = atoll(ping_sent); 3334 if (ping_recv != NULL) currentNode->ping_recv = atoll(ping_recv); 3335 if (!getfriends && myself) break; 3336 } 3337 cleanup: 3338 if (reply) freeReplyObject(reply); 3339 return success; 3340 } 3341 3342 /* Retrieves info about the cluster using argument 'node' as the starting 3343 * point. All nodes will be loaded inside the cluster_manager.nodes list. 3344 * Warning: if something goes wrong, it will free the starting node before 3345 * returning 0. */ 3346 static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) { 3347 if (node->context == NULL && !clusterManagerNodeConnect(node)) { 3348 freeClusterManagerNode(node); 3349 return 0; 3350 } 3351 opts |= CLUSTER_MANAGER_OPT_GETFRIENDS; 3352 char *e = NULL; 3353 if (!clusterManagerNodeIsCluster(node, &e)) { 3354 clusterManagerPrintNotClusterNodeError(node, e); 3355 if (e) zfree(e); 3356 freeClusterManagerNode(node); 3357 return 0; 3358 } 3359 e = NULL; 3360 if (!clusterManagerNodeLoadInfo(node, opts, &e)) { 3361 if (e) { 3362 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, e); 3363 zfree(e); 3364 } 3365 freeClusterManagerNode(node); 3366 return 0; 3367 } 3368 listIter li; 3369 listNode *ln; 3370 if (cluster_manager.nodes != NULL) { 3371 listRewind(cluster_manager.nodes, &li); 3372 while ((ln = listNext(&li)) != NULL) 3373 freeClusterManagerNode((clusterManagerNode *) ln->value); 3374 listRelease(cluster_manager.nodes); 3375 } 3376 cluster_manager.nodes = listCreate(); 3377 listAddNodeTail(cluster_manager.nodes, node); 3378 if (node->friends != NULL) { 3379 listRewind(node->friends, &li); 3380 while ((ln = listNext(&li)) != NULL) { 3381 clusterManagerNode *friend = ln->value; 3382 if (!friend->ip || !friend->port) goto invalid_friend; 3383 if (!friend->context && !clusterManagerNodeConnect(friend)) 3384 goto invalid_friend; 3385 e = NULL; 3386 if (clusterManagerNodeLoadInfo(friend, 0, &e)) { 3387 if (friend->flags & (CLUSTER_MANAGER_FLAG_NOADDR | 3388 CLUSTER_MANAGER_FLAG_DISCONNECT | 3389 CLUSTER_MANAGER_FLAG_FAIL)) 3390 goto invalid_friend; 3391 listAddNodeTail(cluster_manager.nodes, friend); 3392 } else { 3393 clusterManagerLogErr("[ERR] Unable to load info for " 3394 "node %s:%d\n", 3395 friend->ip, friend->port); 3396 goto invalid_friend; 3397 } 3398 continue; 3399 invalid_friend: 3400 freeClusterManagerNode(friend); 3401 } 3402 listRelease(node->friends); 3403 node->friends = NULL; 3404 } 3405 // Count replicas for each node 3406 listRewind(cluster_manager.nodes, &li); 3407 while ((ln = listNext(&li)) != NULL) { 3408 clusterManagerNode *n = ln->value; 3409 if (n->replicate != NULL) { 3410 clusterManagerNode *master = clusterManagerNodeByName(n->replicate); 3411 if (master == NULL) { 3412 clusterManagerLogWarn("*** WARNING: %s:%d claims to be " 3413 "slave of unknown node ID %s.\n", 3414 n->ip, n->port, n->replicate); 3415 } else master->replicas_count++; 3416 } 3417 } 3418 return 1; 3419 } 3420 3421 /* Compare functions used by various sorting operations. */ 3422 int clusterManagerSlotCompare(const void *slot1, const void *slot2) { 3423 const char **i1 = (const char **)slot1; 3424 const char **i2 = (const char **)slot2; 3425 return strcmp(*i1, *i2); 3426 } 3427 3428 int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) { 3429 clusterManagerNode *node1 = *((clusterManagerNode **) n1); 3430 clusterManagerNode *node2 = *((clusterManagerNode **) n2); 3431 return node2->slots_count - node1->slots_count; 3432 } 3433 3434 int clusterManagerCompareNodeBalance(const void *n1, const void *n2) { 3435 clusterManagerNode *node1 = *((clusterManagerNode **) n1); 3436 clusterManagerNode *node2 = *((clusterManagerNode **) n2); 3437 return node1->balance - node2->balance; 3438 } 3439 3440 static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { 3441 sds signature = NULL; 3442 int node_count = 0, i = 0, name_len = 0; 3443 char **node_configs = NULL; 3444 redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); 3445 if (reply == NULL || reply->type == REDIS_REPLY_ERROR) 3446 goto cleanup; 3447 char *lines = reply->str, *p, *line; 3448 while ((p = strstr(lines, "\n")) != NULL) { 3449 i = 0; 3450 *p = '\0'; 3451 line = lines; 3452 lines = p + 1; 3453 char *nodename = NULL; 3454 int tot_size = 0; 3455 while ((p = strchr(line, ' ')) != NULL) { 3456 *p = '\0'; 3457 char *token = line; 3458 line = p + 1; 3459 if (i == 0) { 3460 nodename = token; 3461 tot_size = (p - token); 3462 name_len = tot_size++; // Make room for ':' in tot_size 3463 } 3464 if (++i == 8) break; 3465 } 3466 if (i != 8) continue; 3467 if (nodename == NULL) continue; 3468 int remaining = strlen(line); 3469 if (remaining == 0) continue; 3470 char **slots = NULL; 3471 int c = 0; 3472 while (remaining > 0) { 3473 p = strchr(line, ' '); 3474 if (p == NULL) p = line + remaining; 3475 int size = (p - line); 3476 remaining -= size; 3477 tot_size += size; 3478 char *slotsdef = line; 3479 *p = '\0'; 3480 if (remaining) { 3481 line = p + 1; 3482 remaining--; 3483 } else line = p; 3484 if (slotsdef[0] != '[') { 3485 c++; 3486 slots = zrealloc(slots, (c * sizeof(char *))); 3487 slots[c - 1] = slotsdef; 3488 } 3489 } 3490 if (c > 0) { 3491 if (c > 1) 3492 qsort(slots, c, sizeof(char *), clusterManagerSlotCompare); 3493 node_count++; 3494 node_configs = 3495 zrealloc(node_configs, (node_count * sizeof(char *))); 3496 /* Make room for '|' separators. */ 3497 tot_size += (sizeof(char) * (c - 1)); 3498 char *cfg = zmalloc((sizeof(char) * tot_size) + 1); 3499 memcpy(cfg, nodename, name_len); 3500 char *sp = cfg + name_len; 3501 *(sp++) = ':'; 3502 for (i = 0; i < c; i++) { 3503 if (i > 0) *(sp++) = ','; 3504 int slen = strlen(slots[i]); 3505 memcpy(sp, slots[i], slen); 3506 sp += slen; 3507 } 3508 *(sp++) = '\0'; 3509 node_configs[node_count - 1] = cfg; 3510 } 3511 zfree(slots); 3512 } 3513 if (node_count > 0) { 3514 if (node_count > 1) { 3515 qsort(node_configs, node_count, sizeof(char *), 3516 clusterManagerSlotCompare); 3517 } 3518 signature = sdsempty(); 3519 for (i = 0; i < node_count; i++) { 3520 if (i > 0) signature = sdscatprintf(signature, "%c", '|'); 3521 signature = sdscatfmt(signature, "%s", node_configs[i]); 3522 } 3523 } 3524 cleanup: 3525 if (reply != NULL) freeReplyObject(reply); 3526 if (node_configs != NULL) { 3527 for (i = 0; i < node_count; i++) zfree(node_configs[i]); 3528 zfree(node_configs); 3529 } 3530 return signature; 3531 } 3532 3533 static int clusterManagerIsConfigConsistent(void) { 3534 if (cluster_manager.nodes == NULL) return 0; 3535 int consistent = (listLength(cluster_manager.nodes) <= 1); 3536 // If the Cluster has only one node, it's always consistent 3537 if (consistent) return 1; 3538 sds first_cfg = NULL; 3539 listIter li; 3540 listNode *ln; 3541 listRewind(cluster_manager.nodes, &li); 3542 while ((ln = listNext(&li)) != NULL) { 3543 clusterManagerNode *node = ln->value; 3544 sds cfg = clusterManagerGetConfigSignature(node); 3545 if (cfg == NULL) { 3546 consistent = 0; 3547 break; 3548 } 3549 if (first_cfg == NULL) first_cfg = cfg; 3550 else { 3551 consistent = !sdscmp(first_cfg, cfg); 3552 sdsfree(cfg); 3553 if (!consistent) break; 3554 } 3555 } 3556 if (first_cfg != NULL) sdsfree(first_cfg); 3557 return consistent; 3558 } 3559 3560 /* Add the error string to cluster_manager.errors and print it. */ 3561 static void clusterManagerOnError(sds err) { 3562 if (cluster_manager.errors == NULL) 3563 cluster_manager.errors = listCreate(); 3564 listAddNodeTail(cluster_manager.errors, err); 3565 clusterManagerLogErr("%s\n", (char *) err); 3566 } 3567 3568 /* Check the slots coverage of the cluster. The 'all_slots' argument must be 3569 * and array of 16384 bytes. Every covered slot will be set to 1 in the 3570 * 'all_slots' array. The function returns the total number if covered slots.*/ 3571 static int clusterManagerGetCoveredSlots(char *all_slots) { 3572 if (cluster_manager.nodes == NULL) return 0; 3573 listIter li; 3574 listNode *ln; 3575 listRewind(cluster_manager.nodes, &li); 3576 int totslots = 0, i; 3577 while ((ln = listNext(&li)) != NULL) { 3578 clusterManagerNode *node = ln->value; 3579 for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) { 3580 if (node->slots[i] && !all_slots[i]) { 3581 all_slots[i] = 1; 3582 totslots++; 3583 } 3584 } 3585 } 3586 return totslots; 3587 } 3588 3589 static void clusterManagerPrintSlotsList(list *slots) { 3590 listIter li; 3591 listNode *ln; 3592 listRewind(slots, &li); 3593 sds first = NULL; 3594 while ((ln = listNext(&li)) != NULL) { 3595 sds slot = ln->value; 3596 if (!first) first = slot; 3597 else printf(", "); 3598 printf("%s", slot); 3599 } 3600 printf("\n"); 3601 } 3602 3603 /* Return the node, among 'nodes' with the greatest number of keys 3604 * in the specified slot. */ 3605 static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes, 3606 int slot, 3607 char **err) 3608 { 3609 clusterManagerNode *node = NULL; 3610 int numkeys = 0; 3611 listIter li; 3612 listNode *ln; 3613 listRewind(nodes, &li); 3614 if (err) *err = NULL; 3615 while ((ln = listNext(&li)) != NULL) { 3616 clusterManagerNode *n = ln->value; 3617 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) 3618 continue; 3619 redisReply *r = 3620 CLUSTER_MANAGER_COMMAND(n, "CLUSTER COUNTKEYSINSLOT %d", slot); 3621 int success = clusterManagerCheckRedisReply(n, r, err); 3622 if (success) { 3623 if (r->integer > numkeys || node == NULL) { 3624 numkeys = r->integer; 3625 node = n; 3626 } 3627 } 3628 if (r != NULL) freeReplyObject(r); 3629 /* If the reply contains errors */ 3630 if (!success) { 3631 if (err != NULL && *err != NULL) 3632 CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err); 3633 node = NULL; 3634 break; 3635 } 3636 } 3637 return node; 3638 } 3639 3640 /* This function returns the master that has the least number of replicas 3641 * in the cluster. If there are multiple masters with the same smaller 3642 * number of replicas, one at random is returned. */ 3643 3644 static clusterManagerNode *clusterManagerNodeWithLeastReplicas() { 3645 clusterManagerNode *node = NULL; 3646 int lowest_count = 0; 3647 listIter li; 3648 listNode *ln; 3649 listRewind(cluster_manager.nodes, &li); 3650 while ((ln = listNext(&li)) != NULL) { 3651 clusterManagerNode *n = ln->value; 3652 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3653 if (node == NULL || n->replicas_count < lowest_count) { 3654 node = n; 3655 lowest_count = n->replicas_count; 3656 } 3657 } 3658 return node; 3659 } 3660 3661 /* This fucntion returns a random master node, return NULL if none */ 3662 3663 static clusterManagerNode *clusterManagerNodeMasterRandom() { 3664 int master_count = 0; 3665 int idx; 3666 listIter li; 3667 listNode *ln; 3668 listRewind(cluster_manager.nodes, &li); 3669 while ((ln = listNext(&li)) != NULL) { 3670 clusterManagerNode *n = ln->value; 3671 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3672 master_count++; 3673 } 3674 3675 srand(time(NULL)); 3676 idx = rand() % master_count; 3677 listRewind(cluster_manager.nodes, &li); 3678 while ((ln = listNext(&li)) != NULL) { 3679 clusterManagerNode *n = ln->value; 3680 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3681 if (!idx--) { 3682 return n; 3683 } 3684 } 3685 /* Can not be reached */ 3686 return NULL; 3687 } 3688 3689 static int clusterManagerFixSlotsCoverage(char *all_slots) { 3690 int i, fixed = 0; 3691 list *none = NULL, *single = NULL, *multi = NULL; 3692 clusterManagerLogInfo(">>> Fixing slots coverage...\n"); 3693 printf("List of not covered slots: \n"); 3694 int uncovered_count = 0; 3695 sds log = sdsempty(); 3696 for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) { 3697 int covered = all_slots[i]; 3698 if (!covered) { 3699 sds key = sdsfromlonglong((long long) i); 3700 if (uncovered_count++ > 0) printf(","); 3701 printf("%s", (char *) key); 3702 list *slot_nodes = listCreate(); 3703 sds slot_nodes_str = sdsempty(); 3704 listIter li; 3705 listNode *ln; 3706 listRewind(cluster_manager.nodes, &li); 3707 while ((ln = listNext(&li)) != NULL) { 3708 clusterManagerNode *n = ln->value; 3709 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) 3710 continue; 3711 redisReply *reply = CLUSTER_MANAGER_COMMAND(n, 3712 "CLUSTER GETKEYSINSLOT %d %d", i, 1); 3713 if (!clusterManagerCheckRedisReply(n, reply, NULL)) { 3714 fixed = -1; 3715 if (reply) freeReplyObject(reply); 3716 goto cleanup; 3717 } 3718 assert(reply->type == REDIS_REPLY_ARRAY); 3719 if (reply->elements > 0) { 3720 listAddNodeTail(slot_nodes, n); 3721 if (listLength(slot_nodes) > 1) 3722 slot_nodes_str = sdscat(slot_nodes_str, ", "); 3723 slot_nodes_str = sdscatfmt(slot_nodes_str, 3724 "%s:%u", n->ip, n->port); 3725 } 3726 freeReplyObject(reply); 3727 } 3728 log = sdscatfmt(log, "\nSlot %S has keys in %u nodes: %S", 3729 key, listLength(slot_nodes), slot_nodes_str); 3730 sdsfree(slot_nodes_str); 3731 dictAdd(clusterManagerUncoveredSlots, key, slot_nodes); 3732 } 3733 } 3734 printf("\n%s\n", log); 3735 /* For every slot, take action depending on the actual condition: 3736 * 1) No node has keys for this slot. 3737 * 2) A single node has keys for this slot. 3738 * 3) Multiple nodes have keys for this slot. */ 3739 none = listCreate(); 3740 single = listCreate(); 3741 multi = listCreate(); 3742 dictIterator *iter = dictGetIterator(clusterManagerUncoveredSlots); 3743 dictEntry *entry; 3744 while ((entry = dictNext(iter)) != NULL) { 3745 sds slot = (sds) dictGetKey(entry); 3746 list *nodes = (list *) dictGetVal(entry); 3747 switch (listLength(nodes)){ 3748 case 0: listAddNodeTail(none, slot); break; 3749 case 1: listAddNodeTail(single, slot); break; 3750 default: listAddNodeTail(multi, slot); break; 3751 } 3752 } 3753 dictReleaseIterator(iter); 3754 3755 /* Handle case "1": keys in no node. */ 3756 if (listLength(none) > 0) { 3757 printf("The following uncovered slots have no keys " 3758 "across the cluster:\n"); 3759 clusterManagerPrintSlotsList(none); 3760 if (confirmWithYes("Fix these slots by covering with a random node?")){ 3761 listIter li; 3762 listNode *ln; 3763 listRewind(none, &li); 3764 while ((ln = listNext(&li)) != NULL) { 3765 sds slot = ln->value; 3766 int s = atoi(slot); 3767 clusterManagerNode *n = clusterManagerNodeMasterRandom(); 3768 clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", 3769 slot, n->ip, n->port); 3770 if (!clusterManagerSetSlotOwner(n, s, 0)) { 3771 fixed = -1; 3772 goto cleanup; 3773 } 3774 /* Since CLUSTER ADDSLOTS succeeded, we also update the slot 3775 * info into the node struct, in order to keep it synced */ 3776 n->slots[s] = 1; 3777 fixed++; 3778 } 3779 } 3780 } 3781 3782 /* Handle case "2": keys only in one node. */ 3783 if (listLength(single) > 0) { 3784 printf("The following uncovered slots have keys in just one node:\n"); 3785 clusterManagerPrintSlotsList(single); 3786 if (confirmWithYes("Fix these slots by covering with those nodes?")){ 3787 listIter li; 3788 listNode *ln; 3789 listRewind(single, &li); 3790 while ((ln = listNext(&li)) != NULL) { 3791 sds slot = ln->value; 3792 int s = atoi(slot); 3793 dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); 3794 assert(entry != NULL); 3795 list *nodes = (list *) dictGetVal(entry); 3796 listNode *fn = listFirst(nodes); 3797 assert(fn != NULL); 3798 clusterManagerNode *n = fn->value; 3799 clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", 3800 slot, n->ip, n->port); 3801 if (!clusterManagerSetSlotOwner(n, s, 0)) { 3802 fixed = -1; 3803 goto cleanup; 3804 } 3805 /* Since CLUSTER ADDSLOTS succeeded, we also update the slot 3806 * info into the node struct, in order to keep it synced */ 3807 n->slots[atoi(slot)] = 1; 3808 fixed++; 3809 } 3810 } 3811 } 3812 3813 /* Handle case "3": keys in multiple nodes. */ 3814 if (listLength(multi) > 0) { 3815 printf("The following uncovered slots have keys in multiple nodes:\n"); 3816 clusterManagerPrintSlotsList(multi); 3817 if (confirmWithYes("Fix these slots by moving keys " 3818 "into a single node?")) { 3819 listIter li; 3820 listNode *ln; 3821 listRewind(multi, &li); 3822 while ((ln = listNext(&li)) != NULL) { 3823 sds slot = ln->value; 3824 dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); 3825 assert(entry != NULL); 3826 list *nodes = (list *) dictGetVal(entry); 3827 int s = atoi(slot); 3828 clusterManagerNode *target = 3829 clusterManagerGetNodeWithMostKeysInSlot(nodes, s, NULL); 3830 if (target == NULL) { 3831 fixed = -1; 3832 goto cleanup; 3833 } 3834 clusterManagerLogInfo(">>> Covering slot %s moving keys " 3835 "to %s:%d\n", slot, 3836 target->ip, target->port); 3837 if (!clusterManagerSetSlotOwner(target, s, 1)) { 3838 fixed = -1; 3839 goto cleanup; 3840 } 3841 /* Since CLUSTER ADDSLOTS succeeded, we also update the slot 3842 * info into the node struct, in order to keep it synced */ 3843 target->slots[atoi(slot)] = 1; 3844 listIter nli; 3845 listNode *nln; 3846 listRewind(nodes, &nli); 3847 while ((nln = listNext(&nli)) != NULL) { 3848 clusterManagerNode *src = nln->value; 3849 if (src == target) continue; 3850 /* Assign the slot to target node in the source node. */ 3851 if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) 3852 fixed = -1; 3853 if (fixed < 0) goto cleanup; 3854 /* Set the source node in 'importing' state 3855 * (even if we will actually migrate keys away) 3856 * in order to avoid receiving redirections 3857 * for MIGRATE. */ 3858 if (!clusterManagerSetSlot(src, target, s, 3859 "IMPORTING", NULL)) fixed = -1; 3860 if (fixed < 0) goto cleanup; 3861 int opts = CLUSTER_MANAGER_OPT_VERBOSE | 3862 CLUSTER_MANAGER_OPT_COLD; 3863 if (!clusterManagerMoveSlot(src, target, s, opts, NULL)) { 3864 fixed = -1; 3865 goto cleanup; 3866 } 3867 if (!clusterManagerClearSlotStatus(src, s)) 3868 fixed = -1; 3869 if (fixed < 0) goto cleanup; 3870 } 3871 fixed++; 3872 } 3873 } 3874 } 3875 cleanup: 3876 sdsfree(log); 3877 if (none) listRelease(none); 3878 if (single) listRelease(single); 3879 if (multi) listRelease(multi); 3880 return fixed; 3881 } 3882 3883 /* Slot 'slot' was found to be in importing or migrating state in one or 3884 * more nodes. This function fixes this condition by migrating keys where 3885 * it seems more sensible. */ 3886 static int clusterManagerFixOpenSlot(int slot) { 3887 clusterManagerLogInfo(">>> Fixing open slot %d\n", slot); 3888 /* Try to obtain the current slot owner, according to the current 3889 * nodes configuration. */ 3890 int success = 1; 3891 list *owners = listCreate(); 3892 list *migrating = listCreate(); 3893 list *importing = listCreate(); 3894 sds migrating_str = sdsempty(); 3895 sds importing_str = sdsempty(); 3896 clusterManagerNode *owner = NULL; 3897 listIter li; 3898 listNode *ln; 3899 listRewind(cluster_manager.nodes, &li); 3900 while ((ln = listNext(&li)) != NULL) { 3901 clusterManagerNode *n = ln->value; 3902 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3903 if (n->slots[slot]) listAddNodeTail(owners, n); 3904 else { 3905 redisReply *r = CLUSTER_MANAGER_COMMAND(n, 3906 "CLUSTER COUNTKEYSINSLOT %d", slot); 3907 success = clusterManagerCheckRedisReply(n, r, NULL); 3908 if (success && r->integer > 0) { 3909 clusterManagerLogWarn("*** Found keys about slot %d " 3910 "in non-owner node %s:%d!\n", slot, 3911 n->ip, n->port); 3912 listAddNodeTail(owners, n); 3913 } 3914 if (r) freeReplyObject(r); 3915 if (!success) goto cleanup; 3916 } 3917 } 3918 if (listLength(owners) == 1) owner = listFirst(owners)->value; 3919 listRewind(cluster_manager.nodes, &li); 3920 while ((ln = listNext(&li)) != NULL) { 3921 clusterManagerNode *n = ln->value; 3922 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 3923 int is_migrating = 0, is_importing = 0; 3924 if (n->migrating) { 3925 for (int i = 0; i < n->migrating_count; i += 2) { 3926 sds migrating_slot = n->migrating[i]; 3927 if (atoi(migrating_slot) == slot) { 3928 char *sep = (listLength(migrating) == 0 ? "" : ","); 3929 migrating_str = sdscatfmt(migrating_str, "%s%s:%u", 3930 sep, n->ip, n->port); 3931 listAddNodeTail(migrating, n); 3932 is_migrating = 1; 3933 break; 3934 } 3935 } 3936 } 3937 if (!is_migrating && n->importing) { 3938 for (int i = 0; i < n->importing_count; i += 2) { 3939 sds importing_slot = n->importing[i]; 3940 if (atoi(importing_slot) == slot) { 3941 char *sep = (listLength(importing) == 0 ? "" : ","); 3942 importing_str = sdscatfmt(importing_str, "%s%s:%u", 3943 sep, n->ip, n->port); 3944 listAddNodeTail(importing, n); 3945 is_importing = 1; 3946 break; 3947 } 3948 } 3949 } 3950 /* If the node is neither migrating nor importing and it's not 3951 * the owner, then is added to the importing list in case 3952 * it has keys in the slot. */ 3953 if (!is_migrating && !is_importing && n != owner) { 3954 redisReply *r = CLUSTER_MANAGER_COMMAND(n, 3955 "CLUSTER COUNTKEYSINSLOT %d", slot); 3956 success = clusterManagerCheckRedisReply(n, r, NULL); 3957 if (success && r->integer > 0) { 3958 clusterManagerLogWarn("*** Found keys about slot %d " 3959 "in node %s:%d!\n", slot, n->ip, 3960 n->port); 3961 char *sep = (listLength(importing) == 0 ? "" : ","); 3962 importing_str = sdscatfmt(importing_str, "%s%S:%u", 3963 sep, n->ip, n->port); 3964 listAddNodeTail(importing, n); 3965 } 3966 if (r) freeReplyObject(r); 3967 if (!success) goto cleanup; 3968 } 3969 } 3970 if (sdslen(migrating_str) > 0) 3971 printf("Set as migrating in: %s\n", migrating_str); 3972 if (sdslen(importing_str) > 0) 3973 printf("Set as importing in: %s\n", importing_str); 3974 /* If there is no slot owner, set as owner the node with the biggest 3975 * number of keys, among the set of migrating / importing nodes. */ 3976 if (owner == NULL) { 3977 clusterManagerLogInfo(">>> Nobody claims ownership, " 3978 "selecting an owner...\n"); 3979 owner = clusterManagerGetNodeWithMostKeysInSlot(cluster_manager.nodes, 3980 slot, NULL); 3981 // If we still don't have an owner, we can't fix it. 3982 if (owner == NULL) { 3983 clusterManagerLogErr("[ERR] Can't select a slot owner. " 3984 "Impossible to fix.\n"); 3985 success = 0; 3986 goto cleanup; 3987 } 3988 3989 // Use ADDSLOTS to assign the slot. 3990 clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", 3991 owner->ip, owner->port); 3992 success = clusterManagerClearSlotStatus(owner, slot); 3993 if (!success) goto cleanup; 3994 success = clusterManagerSetSlotOwner(owner, slot, 0); 3995 if (!success) goto cleanup; 3996 /* Since CLUSTER ADDSLOTS succeeded, we also update the slot 3997 * info into the node struct, in order to keep it synced */ 3998 owner->slots[slot] = 1; 3999 /* Make sure this information will propagate. Not strictly needed 4000 * since there is no past owner, so all the other nodes will accept 4001 * whatever epoch this node will claim the slot with. */ 4002 success = clusterManagerBumpEpoch(owner); 4003 if (!success) goto cleanup; 4004 /* Remove the owner from the list of migrating/importing 4005 * nodes. */ 4006 clusterManagerRemoveNodeFromList(migrating, owner); 4007 clusterManagerRemoveNodeFromList(importing, owner); 4008 } 4009 /* If there are multiple owners of the slot, we need to fix it 4010 * so that a single node is the owner and all the other nodes 4011 * are in importing state. Later the fix can be handled by one 4012 * of the base cases above. 4013 * 4014 * Note that this case also covers multiple nodes having the slot 4015 * in migrating state, since migrating is a valid state only for 4016 * slot owners. */ 4017 if (listLength(owners) > 1) { 4018 /* Owner cannot be NULL at this point, since if there are more owners, 4019 * the owner has been set in the previous condition (owner == NULL). */ 4020 assert(owner != NULL); 4021 listRewind(owners, &li); 4022 while ((ln = listNext(&li)) != NULL) { 4023 clusterManagerNode *n = ln->value; 4024 if (n == owner) continue; 4025 success = clusterManagerDelSlot(n, slot, 1); 4026 if (!success) goto cleanup; 4027 n->slots[slot] = 0; 4028 /* Assign the slot to the owner in the node 'n' configuration.' */ 4029 success = clusterManagerSetSlot(n, owner, slot, "node", NULL); 4030 if (!success) goto cleanup; 4031 success = clusterManagerSetSlot(n, owner, slot, "importing", NULL); 4032 if (!success) goto cleanup; 4033 /* Avoid duplicates. */ 4034 clusterManagerRemoveNodeFromList(importing, n); 4035 listAddNodeTail(importing, n); 4036 /* Ensure that the node is not in the migrating list. */ 4037 clusterManagerRemoveNodeFromList(migrating, n); 4038 } 4039 } 4040 int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; 4041 /* Case 1: The slot is in migrating state in one node, and in 4042 * importing state in 1 node. That's trivial to address. */ 4043 if (listLength(migrating) == 1 && listLength(importing) == 1) { 4044 clusterManagerNode *src = listFirst(migrating)->value; 4045 clusterManagerNode *dst = listFirst(importing)->value; 4046 clusterManagerLogInfo(">>> Case 1: Moving slot %d from " 4047 "%s:%d to %s:%d\n", slot, 4048 src->ip, src->port, dst->ip, dst->port); 4049 move_opts |= CLUSTER_MANAGER_OPT_UPDATE; 4050 success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); 4051 } 4052 /* Case 2: There are multiple nodes that claim the slot as importing, 4053 * they probably got keys about the slot after a restart so opened 4054 * the slot. In this case we just move all the keys to the owner 4055 * according to the configuration. */ 4056 else if (listLength(migrating) == 0 && listLength(importing) > 0) { 4057 clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its " 4058 "owner %s:%d\n", slot, owner->ip, owner->port); 4059 move_opts |= CLUSTER_MANAGER_OPT_COLD; 4060 listRewind(importing, &li); 4061 while ((ln = listNext(&li)) != NULL) { 4062 clusterManagerNode *n = ln->value; 4063 if (n == owner) continue; 4064 success = clusterManagerMoveSlot(n, owner, slot, move_opts, NULL); 4065 if (!success) goto cleanup; 4066 clusterManagerLogInfo(">>> Setting %d as STABLE in " 4067 "%s:%d\n", slot, n->ip, n->port); 4068 success = clusterManagerClearSlotStatus(n, slot); 4069 if (!success) goto cleanup; 4070 } 4071 /* Since the slot has been moved in "cold" mode, ensure that all the 4072 * other nodes update their own configuration about the slot itself. */ 4073 listRewind(cluster_manager.nodes, &li); 4074 while ((ln = listNext(&li)) != NULL) { 4075 clusterManagerNode *n = ln->value; 4076 if (n == owner) continue; 4077 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 4078 success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); 4079 if (!success) goto cleanup; 4080 } 4081 } 4082 /* Case 3: The slot is in migrating state in one node but multiple 4083 * other nodes claim to be in importing state and don't have any key in 4084 * the slot. We search for the importing node having the same ID as 4085 * the destination node of the migrating node. 4086 * In that case we move the slot from the migrating node to this node and 4087 * we close the importing states on all the other importing nodes. 4088 * If no importing node has the same ID as the destination node of the 4089 * migrating node, the slot's state is closed on both the migrating node 4090 * and the importing nodes. */ 4091 else if (listLength(migrating) == 1 && listLength(importing) > 1) { 4092 int try_to_fix = 1; 4093 clusterManagerNode *src = listFirst(migrating)->value; 4094 clusterManagerNode *dst = NULL; 4095 sds target_id = NULL; 4096 for (int i = 0; i < src->migrating_count; i += 2) { 4097 sds migrating_slot = src->migrating[i]; 4098 if (atoi(migrating_slot) == slot) { 4099 target_id = src->migrating[i + 1]; 4100 break; 4101 } 4102 } 4103 assert(target_id != NULL); 4104 listIter li; 4105 listNode *ln; 4106 listRewind(importing, &li); 4107 while ((ln = listNext(&li)) != NULL) { 4108 clusterManagerNode *n = ln->value; 4109 int count = clusterManagerCountKeysInSlot(n, slot); 4110 if (count > 0) { 4111 try_to_fix = 0; 4112 break; 4113 } 4114 if (strcmp(n->name, target_id) == 0) dst = n; 4115 } 4116 if (!try_to_fix) goto unhandled_case; 4117 if (dst != NULL) { 4118 clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to " 4119 "%s:%d and closing it on all the other " 4120 "importing nodes.\n", 4121 slot, src->ip, src->port, 4122 dst->ip, dst->port); 4123 /* Move the slot to the destination node. */ 4124 success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); 4125 if (!success) goto cleanup; 4126 /* Close slot on all the other importing nodes. */ 4127 listRewind(importing, &li); 4128 while ((ln = listNext(&li)) != NULL) { 4129 clusterManagerNode *n = ln->value; 4130 if (dst == n) continue; 4131 success = clusterManagerClearSlotStatus(n, slot); 4132 if (!success) goto cleanup; 4133 } 4134 } else { 4135 clusterManagerLogInfo(">>> Case 3: Closing slot %d on both " 4136 "migrating and importing nodes.\n", slot); 4137 /* Close the slot on both the migrating node and the importing 4138 * nodes. */ 4139 success = clusterManagerClearSlotStatus(src, slot); 4140 if (!success) goto cleanup; 4141 listRewind(importing, &li); 4142 while ((ln = listNext(&li)) != NULL) { 4143 clusterManagerNode *n = ln->value; 4144 success = clusterManagerClearSlotStatus(n, slot); 4145 if (!success) goto cleanup; 4146 } 4147 } 4148 } else { 4149 int try_to_close_slot = (listLength(importing) == 0 && 4150 listLength(migrating) == 1); 4151 if (try_to_close_slot) { 4152 clusterManagerNode *n = listFirst(migrating)->value; 4153 if (!owner || owner != n) { 4154 redisReply *r = CLUSTER_MANAGER_COMMAND(n, 4155 "CLUSTER GETKEYSINSLOT %d %d", slot, 10); 4156 success = clusterManagerCheckRedisReply(n, r, NULL); 4157 if (r) { 4158 if (success) try_to_close_slot = (r->elements == 0); 4159 freeReplyObject(r); 4160 } 4161 if (!success) goto cleanup; 4162 } 4163 } 4164 /* Case 4: There are no slots claiming to be in importing state, but 4165 * there is a migrating node that actually don't have any key or is the 4166 * slot owner. We can just close the slot, probably a reshard 4167 * interrupted in the middle. */ 4168 if (try_to_close_slot) { 4169 clusterManagerNode *n = listFirst(migrating)->value; 4170 clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n", 4171 slot, n->ip, n->port); 4172 redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", 4173 slot, "STABLE"); 4174 success = clusterManagerCheckRedisReply(n, r, NULL); 4175 if (r) freeReplyObject(r); 4176 if (!success) goto cleanup; 4177 } else { 4178 unhandled_case: 4179 success = 0; 4180 clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot " 4181 "yet (work in progress). Slot is set as " 4182 "migrating in %s, as importing in %s, " 4183 "owner is %s:%d\n", migrating_str, 4184 importing_str, owner->ip, owner->port); 4185 } 4186 } 4187 cleanup: 4188 listRelease(owners); 4189 listRelease(migrating); 4190 listRelease(importing); 4191 sdsfree(migrating_str); 4192 sdsfree(importing_str); 4193 return success; 4194 } 4195 4196 static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { 4197 clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); 4198 int success = 0; 4199 assert(listLength(owners) > 1); 4200 clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, 4201 slot, 4202 NULL); 4203 if (!owner) owner = listFirst(owners)->value; 4204 clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n", 4205 slot, owner->ip, owner->port); 4206 /* Set the slot owner. */ 4207 if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; 4208 listIter li; 4209 listNode *ln; 4210 listRewind(cluster_manager.nodes, &li); 4211 /* Update configuration in all the other master nodes by assigning the slot 4212 * itself to the new owner, and by eventually migrating keys if the node 4213 * has keys for the slot. */ 4214 while ((ln = listNext(&li)) != NULL) { 4215 clusterManagerNode *n = ln->value; 4216 if (n == owner) continue; 4217 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 4218 int count = clusterManagerCountKeysInSlot(n, slot); 4219 success = (count >= 0); 4220 if (!success) break; 4221 clusterManagerDelSlot(n, slot, 1); 4222 if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; 4223 if (count > 0) { 4224 int opts = CLUSTER_MANAGER_OPT_VERBOSE | 4225 CLUSTER_MANAGER_OPT_COLD; 4226 success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); 4227 if (!success) break; 4228 } 4229 } 4230 return success; 4231 } 4232 4233 static int clusterManagerCheckCluster(int quiet) { 4234 listNode *ln = listFirst(cluster_manager.nodes); 4235 if (!ln) return 0; 4236 clusterManagerNode *node = ln->value; 4237 clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", 4238 node->ip, node->port); 4239 int result = 1, consistent = 0; 4240 int do_fix = config.cluster_manager_command.flags & 4241 CLUSTER_MANAGER_CMD_FLAG_FIX; 4242 if (!quiet) clusterManagerShowNodes(); 4243 consistent = clusterManagerIsConfigConsistent(); 4244 if (!consistent) { 4245 sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); 4246 clusterManagerOnError(err); 4247 result = 0; 4248 } else { 4249 clusterManagerLogOk("[OK] All nodes agree about slots " 4250 "configuration.\n"); 4251 } 4252 /* Check open slots */ 4253 clusterManagerLogInfo(">>> Check for open slots...\n"); 4254 listIter li; 4255 listRewind(cluster_manager.nodes, &li); 4256 int i; 4257 dict *open_slots = NULL; 4258 while ((ln = listNext(&li)) != NULL) { 4259 clusterManagerNode *n = ln->value; 4260 if (n->migrating != NULL) { 4261 if (open_slots == NULL) 4262 open_slots = dictCreate(&clusterManagerDictType, NULL); 4263 sds errstr = sdsempty(); 4264 errstr = sdscatprintf(errstr, 4265 "[WARNING] Node %s:%d has slots in " 4266 "migrating state ", 4267 n->ip, 4268 n->port); 4269 for (i = 0; i < n->migrating_count; i += 2) { 4270 sds slot = n->migrating[i]; 4271 dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); 4272 char *fmt = (i > 0 ? ",%S" : "%S"); 4273 errstr = sdscatfmt(errstr, fmt, slot); 4274 } 4275 errstr = sdscat(errstr, "."); 4276 clusterManagerOnError(errstr); 4277 } 4278 if (n->importing != NULL) { 4279 if (open_slots == NULL) 4280 open_slots = dictCreate(&clusterManagerDictType, NULL); 4281 sds errstr = sdsempty(); 4282 errstr = sdscatprintf(errstr, 4283 "[WARNING] Node %s:%d has slots in " 4284 "importing state ", 4285 n->ip, 4286 n->port); 4287 for (i = 0; i < n->importing_count; i += 2) { 4288 sds slot = n->importing[i]; 4289 dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); 4290 char *fmt = (i > 0 ? ",%S" : "%S"); 4291 errstr = sdscatfmt(errstr, fmt, slot); 4292 } 4293 errstr = sdscat(errstr, "."); 4294 clusterManagerOnError(errstr); 4295 } 4296 } 4297 if (open_slots != NULL) { 4298 result = 0; 4299 dictIterator *iter = dictGetIterator(open_slots); 4300 dictEntry *entry; 4301 sds errstr = sdsnew("[WARNING] The following slots are open: "); 4302 i = 0; 4303 while ((entry = dictNext(iter)) != NULL) { 4304 sds slot = (sds) dictGetKey(entry); 4305 char *fmt = (i++ > 0 ? ",%S" : "%S"); 4306 errstr = sdscatfmt(errstr, fmt, slot); 4307 } 4308 clusterManagerLogErr("%s.\n", (char *) errstr); 4309 sdsfree(errstr); 4310 if (do_fix) { 4311 /* Fix open slots. */ 4312 dictReleaseIterator(iter); 4313 iter = dictGetIterator(open_slots); 4314 while ((entry = dictNext(iter)) != NULL) { 4315 sds slot = (sds) dictGetKey(entry); 4316 result = clusterManagerFixOpenSlot(atoi(slot)); 4317 if (!result) break; 4318 } 4319 } 4320 dictReleaseIterator(iter); 4321 dictRelease(open_slots); 4322 } 4323 clusterManagerLogInfo(">>> Check slots coverage...\n"); 4324 char slots[CLUSTER_MANAGER_SLOTS]; 4325 memset(slots, 0, CLUSTER_MANAGER_SLOTS); 4326 int coverage = clusterManagerGetCoveredSlots(slots); 4327 if (coverage == CLUSTER_MANAGER_SLOTS) { 4328 clusterManagerLogOk("[OK] All %d slots covered.\n", 4329 CLUSTER_MANAGER_SLOTS); 4330 } else { 4331 sds err = sdsempty(); 4332 err = sdscatprintf(err, "[ERR] Not all %d slots are " 4333 "covered by nodes.\n", 4334 CLUSTER_MANAGER_SLOTS); 4335 clusterManagerOnError(err); 4336 result = 0; 4337 if (do_fix/* && result*/) { 4338 dictType dtype = clusterManagerDictType; 4339 dtype.keyDestructor = dictSdsDestructor; 4340 dtype.valDestructor = dictListDestructor; 4341 clusterManagerUncoveredSlots = dictCreate(&dtype, NULL); 4342 int fixed = clusterManagerFixSlotsCoverage(slots); 4343 if (fixed > 0) result = 1; 4344 } 4345 } 4346 int search_multiple_owners = config.cluster_manager_command.flags & 4347 CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; 4348 if (search_multiple_owners) { 4349 /* Check whether there are multiple owners, even when slots are 4350 * fully covered and there are no open slots. */ 4351 clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); 4352 int slot = 0; 4353 for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { 4354 listIter li; 4355 listNode *ln; 4356 listRewind(cluster_manager.nodes, &li); 4357 list *owners = listCreate(); 4358 while ((ln = listNext(&li)) != NULL) { 4359 clusterManagerNode *n = ln->value; 4360 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 4361 if (n->slots[slot]) listAddNodeTail(owners, n); 4362 else { 4363 /* Nodes having keys for the slot will be considered 4364 * owners too. */ 4365 int count = clusterManagerCountKeysInSlot(n, slot); 4366 if (count > 0) listAddNodeTail(owners, n); 4367 } 4368 } 4369 if (listLength(owners) > 1) { 4370 result = 0; 4371 clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", 4372 slot, listLength(owners)); 4373 listRewind(owners, &li); 4374 while ((ln = listNext(&li)) != NULL) { 4375 clusterManagerNode *n = ln->value; 4376 clusterManagerLogErr(" %s:%d\n", n->ip, n->port); 4377 } 4378 if (do_fix) { 4379 result = clusterManagerFixMultipleSlotOwners(slot, owners); 4380 if (!result) { 4381 clusterManagerLogErr("Failed to fix multiple owners " 4382 "for slot %d\n", slot); 4383 listRelease(owners); 4384 break; 4385 } 4386 } 4387 } 4388 listRelease(owners); 4389 } 4390 } 4391 return result; 4392 } 4393 4394 static clusterManagerNode *clusterNodeForResharding(char *id, 4395 clusterManagerNode *target, 4396 int *raise_err) 4397 { 4398 clusterManagerNode *node = NULL; 4399 const char *invalid_node_msg = "*** The specified node (%s) is not known " 4400 "or not a master, please retry.\n"; 4401 node = clusterManagerNodeByName(id); 4402 *raise_err = 0; 4403 if (!node || node->flags & CLUSTER_MANAGER_FLAG_SLAVE) { 4404 clusterManagerLogErr(invalid_node_msg, id); 4405 *raise_err = 1; 4406 return NULL; 4407 } else if (node != NULL && target != NULL) { 4408 if (!strcmp(node->name, target->name)) { 4409 clusterManagerLogErr( "*** It is not possible to use " 4410 "the target node as " 4411 "source node.\n"); 4412 return NULL; 4413 } 4414 } 4415 return node; 4416 } 4417 4418 static list *clusterManagerComputeReshardTable(list *sources, int numslots) { 4419 list *moved = listCreate(); 4420 int src_count = listLength(sources), i = 0, tot_slots = 0, j; 4421 clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted)); 4422 listIter li; 4423 listNode *ln; 4424 listRewind(sources, &li); 4425 while ((ln = listNext(&li)) != NULL) { 4426 clusterManagerNode *node = ln->value; 4427 tot_slots += node->slots_count; 4428 sorted[i++] = node; 4429 } 4430 qsort(sorted, src_count, sizeof(clusterManagerNode *), 4431 clusterManagerSlotCountCompareDesc); 4432 for (i = 0; i < src_count; i++) { 4433 clusterManagerNode *node = sorted[i]; 4434 float n = ((float) numslots / tot_slots * node->slots_count); 4435 if (i == 0) n = ceil(n); 4436 else n = floor(n); 4437 int max = (int) n, count = 0; 4438 for (j = 0; j < CLUSTER_MANAGER_SLOTS; j++) { 4439 int slot = node->slots[j]; 4440 if (!slot) continue; 4441 if (count >= max || (int)listLength(moved) >= numslots) break; 4442 clusterManagerReshardTableItem *item = zmalloc(sizeof(*item)); 4443 item->source = node; 4444 item->slot = j; 4445 listAddNodeTail(moved, item); 4446 count++; 4447 } 4448 } 4449 zfree(sorted); 4450 return moved; 4451 } 4452 4453 static void clusterManagerShowReshardTable(list *table) { 4454 listIter li; 4455 listNode *ln; 4456 listRewind(table, &li); 4457 while ((ln = listNext(&li)) != NULL) { 4458 clusterManagerReshardTableItem *item = ln->value; 4459 clusterManagerNode *n = item->source; 4460 printf(" Moving slot %d from %s\n", item->slot, (char *) n->name); 4461 } 4462 } 4463 4464 static void clusterManagerReleaseReshardTable(list *table) { 4465 if (table != NULL) { 4466 listIter li; 4467 listNode *ln; 4468 listRewind(table, &li); 4469 while ((ln = listNext(&li)) != NULL) { 4470 clusterManagerReshardTableItem *item = ln->value; 4471 zfree(item); 4472 } 4473 listRelease(table); 4474 } 4475 } 4476 4477 static void clusterManagerLog(int level, const char* fmt, ...) { 4478 int use_colors = 4479 (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COLOR); 4480 if (use_colors) { 4481 printf("\033["); 4482 switch (level) { 4483 case CLUSTER_MANAGER_LOG_LVL_INFO: printf(LOG_COLOR_BOLD); break; 4484 case CLUSTER_MANAGER_LOG_LVL_WARN: printf(LOG_COLOR_YELLOW); break; 4485 case CLUSTER_MANAGER_LOG_LVL_ERR: printf(LOG_COLOR_RED); break; 4486 case CLUSTER_MANAGER_LOG_LVL_SUCCESS: printf(LOG_COLOR_GREEN); break; 4487 default: printf(LOG_COLOR_RESET); break; 4488 } 4489 } 4490 va_list ap; 4491 va_start(ap, fmt); 4492 vprintf(fmt, ap); 4493 va_end(ap); 4494 if (use_colors) printf("\033[" LOG_COLOR_RESET); 4495 } 4496 4497 static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, 4498 int alloc_len) 4499 { 4500 array->nodes = zcalloc(alloc_len * sizeof(clusterManagerNode*)); 4501 array->alloc = array->nodes; 4502 array->len = alloc_len; 4503 array->count = 0; 4504 } 4505 4506 /* Reset array->nodes to the original array allocation and re-count non-NULL 4507 * nodes. */ 4508 static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) { 4509 if (array->nodes > array->alloc) { 4510 array->len = array->nodes - array->alloc; 4511 array->nodes = array->alloc; 4512 array->count = 0; 4513 int i = 0; 4514 for(; i < array->len; i++) { 4515 if (array->nodes[i] != NULL) array->count++; 4516 } 4517 } 4518 } 4519 4520 /* Shift array->nodes and store the shifted node into 'nodeptr'. */ 4521 static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, 4522 clusterManagerNode **nodeptr) 4523 { 4524 assert(array->nodes < (array->nodes + array->len)); 4525 /* If the first node to be shifted is not NULL, decrement count. */ 4526 if (*array->nodes != NULL) array->count--; 4527 /* Store the first node to be shifted into 'nodeptr'. */ 4528 *nodeptr = *array->nodes; 4529 /* Shift the nodes array and decrement length. */ 4530 array->nodes++; 4531 array->len--; 4532 } 4533 4534 static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, 4535 clusterManagerNode *node) 4536 { 4537 assert(array->nodes < (array->nodes + array->len)); 4538 assert(node != NULL); 4539 assert(array->count < array->len); 4540 array->nodes[array->count++] = node; 4541 } 4542 4543 static void clusterManagerPrintNotEmptyNodeError(clusterManagerNode *node, 4544 char *err) 4545 { 4546 char *msg; 4547 if (err) msg = err; 4548 else { 4549 msg = "is not empty. Either the node already knows other " 4550 "nodes (check with CLUSTER NODES) or contains some " 4551 "key in database 0."; 4552 } 4553 clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg); 4554 } 4555 4556 static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node, 4557 char *err) 4558 { 4559 char *msg = (err ? err : "is not configured as a cluster node."); 4560 clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg); 4561 } 4562 4563 /* Execute redis-cli in Cluster Manager mode */ 4564 static void clusterManagerMode(clusterManagerCommandProc *proc) { 4565 int argc = config.cluster_manager_command.argc; 4566 char **argv = config.cluster_manager_command.argv; 4567 cluster_manager.nodes = NULL; 4568 if (!proc(argc, argv)) goto cluster_manager_err; 4569 freeClusterManager(); 4570 exit(0); 4571 cluster_manager_err: 4572 freeClusterManager(); 4573 sdsfree(config.hostip); 4574 sdsfree(config.mb_delim); 4575 exit(1); 4576 } 4577 4578 /* Cluster Manager Commands */ 4579 4580 static int clusterManagerCommandCreate(int argc, char **argv) { 4581 int i, j, success = 1; 4582 cluster_manager.nodes = listCreate(); 4583 for (i = 0; i < argc; i++) { 4584 char *addr = argv[i]; 4585 char *c = strrchr(addr, '@'); 4586 if (c != NULL) *c = '\0'; 4587 c = strrchr(addr, ':'); 4588 if (c == NULL) { 4589 fprintf(stderr, "Invalid address format: %s\n", addr); 4590 return 0; 4591 } 4592 *c = '\0'; 4593 char *ip = addr; 4594 int port = atoi(++c); 4595 clusterManagerNode *node = clusterManagerNewNode(ip, port); 4596 if (!clusterManagerNodeConnect(node)) { 4597 freeClusterManagerNode(node); 4598 return 0; 4599 } 4600 char *err = NULL; 4601 if (!clusterManagerNodeIsCluster(node, &err)) { 4602 clusterManagerPrintNotClusterNodeError(node, err); 4603 if (err) zfree(err); 4604 freeClusterManagerNode(node); 4605 return 0; 4606 } 4607 err = NULL; 4608 if (!clusterManagerNodeLoadInfo(node, 0, &err)) { 4609 if (err) { 4610 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 4611 zfree(err); 4612 } 4613 freeClusterManagerNode(node); 4614 return 0; 4615 } 4616 err = NULL; 4617 if (!clusterManagerNodeIsEmpty(node, &err)) { 4618 clusterManagerPrintNotEmptyNodeError(node, err); 4619 if (err) zfree(err); 4620 freeClusterManagerNode(node); 4621 return 0; 4622 } 4623 listAddNodeTail(cluster_manager.nodes, node); 4624 } 4625 int node_len = cluster_manager.nodes->len; 4626 int replicas = config.cluster_manager_command.replicas; 4627 int masters_count = CLUSTER_MANAGER_MASTERS_COUNT(node_len, replicas); 4628 if (masters_count < 3) { 4629 clusterManagerLogErr( 4630 "*** ERROR: Invalid configuration for cluster creation.\n" 4631 "*** Redis Cluster requires at least 3 master nodes.\n" 4632 "*** This is not possible with %d nodes and %d replicas per node.", 4633 node_len, replicas); 4634 clusterManagerLogErr("\n*** At least %d nodes are required.\n", 4635 3 * (replicas + 1)); 4636 return 0; 4637 } 4638 clusterManagerLogInfo(">>> Performing hash slots allocation " 4639 "on %d nodes...\n", node_len); 4640 int interleaved_len = 0, ip_count = 0; 4641 clusterManagerNode **interleaved = zcalloc(node_len*sizeof(**interleaved)); 4642 char **ips = zcalloc(node_len * sizeof(char*)); 4643 clusterManagerNodeArray *ip_nodes = zcalloc(node_len * sizeof(*ip_nodes)); 4644 listIter li; 4645 listNode *ln; 4646 listRewind(cluster_manager.nodes, &li); 4647 while ((ln = listNext(&li)) != NULL) { 4648 clusterManagerNode *n = ln->value; 4649 int found = 0; 4650 for (i = 0; i < ip_count; i++) { 4651 char *ip = ips[i]; 4652 if (!strcmp(ip, n->ip)) { 4653 found = 1; 4654 break; 4655 } 4656 } 4657 if (!found) { 4658 ips[ip_count++] = n->ip; 4659 } 4660 clusterManagerNodeArray *node_array = &(ip_nodes[i]); 4661 if (node_array->nodes == NULL) 4662 clusterManagerNodeArrayInit(node_array, node_len); 4663 clusterManagerNodeArrayAdd(node_array, n); 4664 } 4665 while (interleaved_len < node_len) { 4666 for (i = 0; i < ip_count; i++) { 4667 clusterManagerNodeArray *node_array = &(ip_nodes[i]); 4668 if (node_array->count > 0) { 4669 clusterManagerNode *n = NULL; 4670 clusterManagerNodeArrayShift(node_array, &n); 4671 interleaved[interleaved_len++] = n; 4672 } 4673 } 4674 } 4675 clusterManagerNode **masters = interleaved; 4676 interleaved += masters_count; 4677 interleaved_len -= masters_count; 4678 float slots_per_node = CLUSTER_MANAGER_SLOTS / (float) masters_count; 4679 long first = 0; 4680 float cursor = 0.0f; 4681 for (i = 0; i < masters_count; i++) { 4682 clusterManagerNode *master = masters[i]; 4683 long last = lround(cursor + slots_per_node - 1); 4684 if (last > CLUSTER_MANAGER_SLOTS || i == (masters_count - 1)) 4685 last = CLUSTER_MANAGER_SLOTS - 1; 4686 if (last < first) last = first; 4687 printf("Master[%d] -> Slots %lu - %lu\n", i, first, last); 4688 master->slots_count = 0; 4689 for (j = first; j <= last; j++) { 4690 master->slots[j] = 1; 4691 master->slots_count++; 4692 } 4693 master->dirty = 1; 4694 first = last + 1; 4695 cursor += slots_per_node; 4696 } 4697 4698 /* Rotating the list sometimes helps to get better initial 4699 * anti-affinity before the optimizer runs. */ 4700 clusterManagerNode *first_node = interleaved[0]; 4701 for (i = 0; i < (interleaved_len - 1); i++) 4702 interleaved[i] = interleaved[i + 1]; 4703 interleaved[interleaved_len - 1] = first_node; 4704 int assign_unused = 0, available_count = interleaved_len; 4705 assign_replicas: 4706 for (i = 0; i < masters_count; i++) { 4707 clusterManagerNode *master = masters[i]; 4708 int assigned_replicas = 0; 4709 while (assigned_replicas < replicas) { 4710 if (available_count == 0) break; 4711 clusterManagerNode *found = NULL, *slave = NULL; 4712 int firstNodeIdx = -1; 4713 for (j = 0; j < interleaved_len; j++) { 4714 clusterManagerNode *n = interleaved[j]; 4715 if (n == NULL) continue; 4716 if (strcmp(n->ip, master->ip)) { 4717 found = n; 4718 interleaved[j] = NULL; 4719 break; 4720 } 4721 if (firstNodeIdx < 0) firstNodeIdx = j; 4722 } 4723 if (found) slave = found; 4724 else if (firstNodeIdx >= 0) { 4725 slave = interleaved[firstNodeIdx]; 4726 interleaved_len -= (interleaved - (interleaved + firstNodeIdx)); 4727 interleaved += (firstNodeIdx + 1); 4728 } 4729 if (slave != NULL) { 4730 assigned_replicas++; 4731 available_count--; 4732 if (slave->replicate) sdsfree(slave->replicate); 4733 slave->replicate = sdsnew(master->name); 4734 slave->dirty = 1; 4735 } else break; 4736 printf("Adding replica %s:%d to %s:%d\n", slave->ip, slave->port, 4737 master->ip, master->port); 4738 if (assign_unused) break; 4739 } 4740 } 4741 if (!assign_unused && available_count > 0) { 4742 assign_unused = 1; 4743 printf("Adding extra replicas...\n"); 4744 goto assign_replicas; 4745 } 4746 for (i = 0; i < ip_count; i++) { 4747 clusterManagerNodeArray *node_array = ip_nodes + i; 4748 clusterManagerNodeArrayReset(node_array); 4749 } 4750 clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count); 4751 clusterManagerShowNodes(); 4752 if (confirmWithYes("Can I set the above configuration?")) { 4753 listRewind(cluster_manager.nodes, &li); 4754 while ((ln = listNext(&li)) != NULL) { 4755 clusterManagerNode *node = ln->value; 4756 char *err = NULL; 4757 int flushed = clusterManagerFlushNodeConfig(node, &err); 4758 if (!flushed && node->dirty && !node->replicate) { 4759 if (err != NULL) { 4760 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 4761 zfree(err); 4762 } 4763 success = 0; 4764 goto cleanup; 4765 } else if (err != NULL) zfree(err); 4766 } 4767 clusterManagerLogInfo(">>> Nodes configuration updated\n"); 4768 clusterManagerLogInfo(">>> Assign a different config epoch to " 4769 "each node\n"); 4770 int config_epoch = 1; 4771 listRewind(cluster_manager.nodes, &li); 4772 while ((ln = listNext(&li)) != NULL) { 4773 clusterManagerNode *node = ln->value; 4774 redisReply *reply = NULL; 4775 reply = CLUSTER_MANAGER_COMMAND(node, 4776 "cluster set-config-epoch %d", 4777 config_epoch++); 4778 if (reply != NULL) freeReplyObject(reply); 4779 } 4780 clusterManagerLogInfo(">>> Sending CLUSTER MEET messages to join " 4781 "the cluster\n"); 4782 clusterManagerNode *first = NULL; 4783 listRewind(cluster_manager.nodes, &li); 4784 while ((ln = listNext(&li)) != NULL) { 4785 clusterManagerNode *node = ln->value; 4786 if (first == NULL) { 4787 first = node; 4788 continue; 4789 } 4790 redisReply *reply = NULL; 4791 reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d", 4792 first->ip, first->port); 4793 int is_err = 0; 4794 if (reply != NULL) { 4795 if ((is_err = reply->type == REDIS_REPLY_ERROR)) 4796 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, reply->str); 4797 freeReplyObject(reply); 4798 } else { 4799 is_err = 1; 4800 fprintf(stderr, "Failed to send CLUSTER MEET command.\n"); 4801 } 4802 if (is_err) { 4803 success = 0; 4804 goto cleanup; 4805 } 4806 } 4807 /* Give one second for the join to start, in order to avoid that 4808 * waiting for cluster join will find all the nodes agree about 4809 * the config as they are still empty with unassigned slots. */ 4810 sleep(1); 4811 clusterManagerWaitForClusterJoin(); 4812 /* Useful for the replicas */ 4813 listRewind(cluster_manager.nodes, &li); 4814 while ((ln = listNext(&li)) != NULL) { 4815 clusterManagerNode *node = ln->value; 4816 if (!node->dirty) continue; 4817 char *err = NULL; 4818 int flushed = clusterManagerFlushNodeConfig(node, &err); 4819 if (!flushed && !node->replicate) { 4820 if (err != NULL) { 4821 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); 4822 zfree(err); 4823 } 4824 success = 0; 4825 goto cleanup; 4826 } 4827 } 4828 // Reset Nodes 4829 listRewind(cluster_manager.nodes, &li); 4830 clusterManagerNode *first_node = NULL; 4831 while ((ln = listNext(&li)) != NULL) { 4832 clusterManagerNode *node = ln->value; 4833 if (!first_node) first_node = node; 4834 else freeClusterManagerNode(node); 4835 } 4836 listEmpty(cluster_manager.nodes); 4837 if (!clusterManagerLoadInfoFromNode(first_node, 0)) { 4838 success = 0; 4839 goto cleanup; 4840 } 4841 clusterManagerCheckCluster(0); 4842 } 4843 cleanup: 4844 /* Free everything */ 4845 zfree(masters); 4846 zfree(ips); 4847 for (i = 0; i < node_len; i++) { 4848 clusterManagerNodeArray *node_array = ip_nodes + i; 4849 CLUSTER_MANAGER_NODE_ARRAY_FREE(node_array); 4850 } 4851 zfree(ip_nodes); 4852 return success; 4853 } 4854 4855 static int clusterManagerCommandAddNode(int argc, char **argv) { 4856 int success = 1; 4857 redisReply *reply = NULL; 4858 char *ref_ip = NULL, *ip = NULL; 4859 int ref_port = 0, port = 0; 4860 if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port)) 4861 goto invalid_args; 4862 if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) 4863 goto invalid_args; 4864 clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port, 4865 ref_ip, ref_port); 4866 // Check the existing cluster 4867 clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port); 4868 if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0; 4869 if (!clusterManagerCheckCluster(0)) return 0; 4870 4871 /* If --cluster-master-id was specified, try to resolve it now so that we 4872 * abort before starting with the node configuration. */ 4873 clusterManagerNode *master_node = NULL; 4874 if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_SLAVE) { 4875 char *master_id = config.cluster_manager_command.master_id; 4876 if (master_id != NULL) { 4877 master_node = clusterManagerNodeByName(master_id); 4878 if (master_node == NULL) { 4879 clusterManagerLogErr("[ERR] No such master ID %s\n", master_id); 4880 return 0; 4881 } 4882 } else { 4883 master_node = clusterManagerNodeWithLeastReplicas(); 4884 assert(master_node != NULL); 4885 printf("Automatically selected master %s:%d\n", master_node->ip, 4886 master_node->port); 4887 } 4888 } 4889 4890 // Add the new node 4891 clusterManagerNode *new_node = clusterManagerNewNode(ip, port); 4892 int added = 0; 4893 if (!clusterManagerNodeConnect(new_node)) { 4894 clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n", 4895 ip, port); 4896 success = 0; 4897 goto cleanup; 4898 } 4899 char *err = NULL; 4900 if (!(success = clusterManagerNodeIsCluster(new_node, &err))) { 4901 clusterManagerPrintNotClusterNodeError(new_node, err); 4902 if (err) zfree(err); 4903 goto cleanup; 4904 } 4905 if (!clusterManagerNodeLoadInfo(new_node, 0, &err)) { 4906 if (err) { 4907 CLUSTER_MANAGER_PRINT_REPLY_ERROR(new_node, err); 4908 zfree(err); 4909 } 4910 success = 0; 4911 goto cleanup; 4912 } 4913 if (!(success = clusterManagerNodeIsEmpty(new_node, &err))) { 4914 clusterManagerPrintNotEmptyNodeError(new_node, err); 4915 if (err) zfree(err); 4916 goto cleanup; 4917 } 4918 clusterManagerNode *first = listFirst(cluster_manager.nodes)->value; 4919 listAddNodeTail(cluster_manager.nodes, new_node); 4920 added = 1; 4921 4922 // Send CLUSTER MEET command to the new node 4923 clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it " 4924 "join the cluster.\n", ip, port); 4925 reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d", 4926 first->ip, first->port); 4927 if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL))) 4928 goto cleanup; 4929 4930 /* Additional configuration is needed if the node is added as a slave. */ 4931 if (master_node) { 4932 sleep(1); 4933 clusterManagerWaitForClusterJoin(); 4934 clusterManagerLogInfo(">>> Configure node as replica of %s:%d.\n", 4935 master_node->ip, master_node->port); 4936 freeReplyObject(reply); 4937 reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER REPLICATE %s", 4938 master_node->name); 4939 if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL))) 4940 goto cleanup; 4941 } 4942 clusterManagerLogOk("[OK] New node added correctly.\n"); 4943 cleanup: 4944 if (!added && new_node) freeClusterManagerNode(new_node); 4945 if (reply) freeReplyObject(reply); 4946 return success; 4947 invalid_args: 4948 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 4949 return 0; 4950 } 4951 4952 static int clusterManagerCommandDeleteNode(int argc, char **argv) { 4953 UNUSED(argc); 4954 int success = 1; 4955 int port = 0; 4956 char *ip = NULL; 4957 if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args; 4958 char *node_id = argv[1]; 4959 clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n", 4960 node_id, ip, port); 4961 clusterManagerNode *ref_node = clusterManagerNewNode(ip, port); 4962 clusterManagerNode *node = NULL; 4963 4964 // Load cluster information 4965 if (!clusterManagerLoadInfoFromNode(ref_node, 0)) return 0; 4966 4967 // Check if the node exists and is not empty 4968 node = clusterManagerNodeByName(node_id); 4969 if (node == NULL) { 4970 clusterManagerLogErr("[ERR] No such node ID %s\n", node_id); 4971 return 0; 4972 } 4973 if (node->slots_count != 0) { 4974 clusterManagerLogErr("[ERR] Node %s:%d is not empty! Reshard data " 4975 "away and try again.\n", node->ip, node->port); 4976 return 0; 4977 } 4978 4979 // Send CLUSTER FORGET to all the nodes but the node to remove 4980 clusterManagerLogInfo(">>> Sending CLUSTER FORGET messages to the " 4981 "cluster...\n"); 4982 listIter li; 4983 listNode *ln; 4984 listRewind(cluster_manager.nodes, &li); 4985 while ((ln = listNext(&li)) != NULL) { 4986 clusterManagerNode *n = ln->value; 4987 if (n == node) continue; 4988 if (n->replicate && !strcasecmp(n->replicate, node_id)) { 4989 // Reconfigure the slave to replicate with some other node 4990 clusterManagerNode *master = clusterManagerNodeWithLeastReplicas(); 4991 assert(master != NULL); 4992 clusterManagerLogInfo(">>> %s:%d as replica of %s:%d\n", 4993 n->ip, n->port, master->ip, master->port); 4994 redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER REPLICATE %s", 4995 master->name); 4996 success = clusterManagerCheckRedisReply(n, r, NULL); 4997 if (r) freeReplyObject(r); 4998 if (!success) return 0; 4999 } 5000 redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER FORGET %s", 5001 node_id); 5002 success = clusterManagerCheckRedisReply(n, r, NULL); 5003 if (r) freeReplyObject(r); 5004 if (!success) return 0; 5005 } 5006 5007 // Finally shutdown the node 5008 clusterManagerLogInfo(">>> SHUTDOWN the node.\n"); 5009 redisReply *r = redisCommand(node->context, "SHUTDOWN"); 5010 success = clusterManagerCheckRedisReply(node, r, NULL); 5011 if (r) freeReplyObject(r); 5012 return success; 5013 invalid_args: 5014 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5015 return 0; 5016 } 5017 5018 static int clusterManagerCommandInfo(int argc, char **argv) { 5019 int port = 0; 5020 char *ip = NULL; 5021 if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; 5022 clusterManagerNode *node = clusterManagerNewNode(ip, port); 5023 if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; 5024 clusterManagerShowClusterInfo(); 5025 return 1; 5026 invalid_args: 5027 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5028 return 0; 5029 } 5030 5031 static int clusterManagerCommandCheck(int argc, char **argv) { 5032 int port = 0; 5033 char *ip = NULL; 5034 if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; 5035 clusterManagerNode *node = clusterManagerNewNode(ip, port); 5036 if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; 5037 clusterManagerShowClusterInfo(); 5038 return clusterManagerCheckCluster(0); 5039 invalid_args: 5040 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5041 return 0; 5042 } 5043 5044 static int clusterManagerCommandFix(int argc, char **argv) { 5045 config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_FIX; 5046 return clusterManagerCommandCheck(argc, argv); 5047 } 5048 5049 static int clusterManagerCommandReshard(int argc, char **argv) { 5050 int port = 0; 5051 char *ip = NULL; 5052 if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; 5053 clusterManagerNode *node = clusterManagerNewNode(ip, port); 5054 if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; 5055 clusterManagerCheckCluster(0); 5056 if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { 5057 fflush(stdout); 5058 fprintf(stderr, 5059 "*** Please fix your cluster problems before resharding\n"); 5060 return 0; 5061 } 5062 int slots = config.cluster_manager_command.slots; 5063 if (!slots) { 5064 while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) { 5065 printf("How many slots do you want to move (from 1 to %d)? ", 5066 CLUSTER_MANAGER_SLOTS); 5067 fflush(stdout); 5068 char buf[6]; 5069 int nread = read(fileno(stdin),buf,6); 5070 if (nread <= 0) continue; 5071 int last_idx = nread - 1; 5072 if (buf[last_idx] != '\n') { 5073 int ch; 5074 while ((ch = getchar()) != '\n' && ch != EOF) {} 5075 } 5076 buf[last_idx] = '\0'; 5077 slots = atoi(buf); 5078 } 5079 } 5080 char buf[255]; 5081 char *to = config.cluster_manager_command.to, 5082 *from = config.cluster_manager_command.from; 5083 while (to == NULL) { 5084 printf("What is the receiving node ID? "); 5085 fflush(stdout); 5086 int nread = read(fileno(stdin),buf,255); 5087 if (nread <= 0) continue; 5088 int last_idx = nread - 1; 5089 if (buf[last_idx] != '\n') { 5090 int ch; 5091 while ((ch = getchar()) != '\n' && ch != EOF) {} 5092 } 5093 buf[last_idx] = '\0'; 5094 if (strlen(buf) > 0) to = buf; 5095 } 5096 int raise_err = 0; 5097 clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err); 5098 if (target == NULL) return 0; 5099 list *sources = listCreate(); 5100 list *table = NULL; 5101 int all = 0, result = 1; 5102 if (from == NULL) { 5103 printf("Please enter all the source node IDs.\n"); 5104 printf(" Type 'all' to use all the nodes as source nodes for " 5105 "the hash slots.\n"); 5106 printf(" Type 'done' once you entered all the source nodes IDs.\n"); 5107 while (1) { 5108 printf("Source node #%lu: ", listLength(sources) + 1); 5109 fflush(stdout); 5110 int nread = read(fileno(stdin),buf,255); 5111 if (nread <= 0) continue; 5112 int last_idx = nread - 1; 5113 if (buf[last_idx] != '\n') { 5114 int ch; 5115 while ((ch = getchar()) != '\n' && ch != EOF) {} 5116 } 5117 buf[last_idx] = '\0'; 5118 if (!strcmp(buf, "done")) break; 5119 else if (!strcmp(buf, "all")) { 5120 all = 1; 5121 break; 5122 } else { 5123 clusterManagerNode *src = 5124 clusterNodeForResharding(buf, target, &raise_err); 5125 if (src != NULL) listAddNodeTail(sources, src); 5126 else if (raise_err) { 5127 result = 0; 5128 goto cleanup; 5129 } 5130 } 5131 } 5132 } else { 5133 char *p; 5134 while((p = strchr(from, ',')) != NULL) { 5135 *p = '\0'; 5136 if (!strcmp(from, "all")) { 5137 all = 1; 5138 break; 5139 } else { 5140 clusterManagerNode *src = 5141 clusterNodeForResharding(from, target, &raise_err); 5142 if (src != NULL) listAddNodeTail(sources, src); 5143 else if (raise_err) { 5144 result = 0; 5145 goto cleanup; 5146 } 5147 } 5148 from = p + 1; 5149 } 5150 /* Check if there's still another source to process. */ 5151 if (!all && strlen(from) > 0) { 5152 if (!strcmp(from, "all")) all = 1; 5153 if (!all) { 5154 clusterManagerNode *src = 5155 clusterNodeForResharding(from, target, &raise_err); 5156 if (src != NULL) listAddNodeTail(sources, src); 5157 else if (raise_err) { 5158 result = 0; 5159 goto cleanup; 5160 } 5161 } 5162 } 5163 } 5164 listIter li; 5165 listNode *ln; 5166 if (all) { 5167 listEmpty(sources); 5168 listRewind(cluster_manager.nodes, &li); 5169 while ((ln = listNext(&li)) != NULL) { 5170 clusterManagerNode *n = ln->value; 5171 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) 5172 continue; 5173 if (!sdscmp(n->name, target->name)) continue; 5174 listAddNodeTail(sources, n); 5175 } 5176 } 5177 if (listLength(sources) == 0) { 5178 fprintf(stderr, "*** No source nodes given, operation aborted.\n"); 5179 result = 0; 5180 goto cleanup; 5181 } 5182 printf("\nReady to move %d slots.\n", slots); 5183 printf(" Source nodes:\n"); 5184 listRewind(sources, &li); 5185 while ((ln = listNext(&li)) != NULL) { 5186 clusterManagerNode *src = ln->value; 5187 sds info = clusterManagerNodeInfo(src, 4); 5188 printf("%s\n", info); 5189 sdsfree(info); 5190 } 5191 printf(" Destination node:\n"); 5192 sds info = clusterManagerNodeInfo(target, 4); 5193 printf("%s\n", info); 5194 sdsfree(info); 5195 table = clusterManagerComputeReshardTable(sources, slots); 5196 printf(" Resharding plan:\n"); 5197 clusterManagerShowReshardTable(table); 5198 if (!(config.cluster_manager_command.flags & 5199 CLUSTER_MANAGER_CMD_FLAG_YES)) 5200 { 5201 printf("Do you want to proceed with the proposed " 5202 "reshard plan (yes/no)? "); 5203 fflush(stdout); 5204 char buf[4]; 5205 int nread = read(fileno(stdin),buf,4); 5206 buf[3] = '\0'; 5207 if (nread <= 0 || strcmp("yes", buf) != 0) { 5208 result = 0; 5209 goto cleanup; 5210 } 5211 } 5212 int opts = CLUSTER_MANAGER_OPT_VERBOSE; 5213 listRewind(table, &li); 5214 while ((ln = listNext(&li)) != NULL) { 5215 clusterManagerReshardTableItem *item = ln->value; 5216 char *err = NULL; 5217 result = clusterManagerMoveSlot(item->source, target, item->slot, 5218 opts, &err); 5219 if (!result) { 5220 if (err != NULL) { 5221 //clusterManagerLogErr("\n%s\n", err); 5222 zfree(err); 5223 } 5224 goto cleanup; 5225 } 5226 } 5227 cleanup: 5228 listRelease(sources); 5229 clusterManagerReleaseReshardTable(table); 5230 return result; 5231 invalid_args: 5232 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5233 return 0; 5234 } 5235 5236 static int clusterManagerCommandRebalance(int argc, char **argv) { 5237 int port = 0; 5238 char *ip = NULL; 5239 clusterManagerNode **weightedNodes = NULL; 5240 list *involved = NULL; 5241 if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; 5242 clusterManagerNode *node = clusterManagerNewNode(ip, port); 5243 if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; 5244 int result = 1, i; 5245 if (config.cluster_manager_command.weight != NULL) { 5246 for (i = 0; i < config.cluster_manager_command.weight_argc; i++) { 5247 char *name = config.cluster_manager_command.weight[i]; 5248 char *p = strchr(name, '='); 5249 if (p == NULL) { 5250 result = 0; 5251 goto cleanup; 5252 } 5253 *p = '\0'; 5254 float w = atof(++p); 5255 clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name); 5256 if (n == NULL) { 5257 clusterManagerLogErr("*** No such master node %s\n", name); 5258 result = 0; 5259 goto cleanup; 5260 } 5261 n->weight = w; 5262 } 5263 } 5264 float total_weight = 0; 5265 int nodes_involved = 0; 5266 int use_empty = config.cluster_manager_command.flags & 5267 CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; 5268 involved = listCreate(); 5269 listIter li; 5270 listNode *ln; 5271 listRewind(cluster_manager.nodes, &li); 5272 /* Compute the total cluster weight. */ 5273 while ((ln = listNext(&li)) != NULL) { 5274 clusterManagerNode *n = ln->value; 5275 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) 5276 continue; 5277 if (!use_empty && n->slots_count == 0) { 5278 n->weight = 0; 5279 continue; 5280 } 5281 total_weight += n->weight; 5282 nodes_involved++; 5283 listAddNodeTail(involved, n); 5284 } 5285 weightedNodes = zmalloc(nodes_involved * sizeof(clusterManagerNode *)); 5286 if (weightedNodes == NULL) goto cleanup; 5287 /* Check cluster, only proceed if it looks sane. */ 5288 clusterManagerCheckCluster(1); 5289 if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { 5290 clusterManagerLogErr("*** Please fix your cluster problems " 5291 "before rebalancing\n"); 5292 result = 0; 5293 goto cleanup; 5294 } 5295 /* Calculate the slots balance for each node. It's the number of 5296 * slots the node should lose (if positive) or gain (if negative) 5297 * in order to be balanced. */ 5298 int threshold_reached = 0, total_balance = 0; 5299 float threshold = config.cluster_manager_command.threshold; 5300 i = 0; 5301 listRewind(involved, &li); 5302 while ((ln = listNext(&li)) != NULL) { 5303 clusterManagerNode *n = ln->value; 5304 weightedNodes[i++] = n; 5305 int expected = (int) (((float)CLUSTER_MANAGER_SLOTS / total_weight) * 5306 n->weight); 5307 n->balance = n->slots_count - expected; 5308 total_balance += n->balance; 5309 /* Compute the percentage of difference between the 5310 * expected number of slots and the real one, to see 5311 * if it's over the threshold specified by the user. */ 5312 int over_threshold = 0; 5313 if (threshold > 0) { 5314 if (n->slots_count > 0) { 5315 float err_perc = fabs((100-(100.0*expected/n->slots_count))); 5316 if (err_perc > threshold) over_threshold = 1; 5317 } else if (expected > 1) { 5318 over_threshold = 1; 5319 } 5320 } 5321 if (over_threshold) threshold_reached = 1; 5322 } 5323 if (!threshold_reached) { 5324 clusterManagerLogWarn("*** No rebalancing needed! " 5325 "All nodes are within the %.2f%% threshold.\n", 5326 config.cluster_manager_command.threshold); 5327 goto cleanup; 5328 } 5329 /* Because of rounding, it is possible that the balance of all nodes 5330 * summed does not give 0. Make sure that nodes that have to provide 5331 * slots are always matched by nodes receiving slots. */ 5332 while (total_balance > 0) { 5333 listRewind(involved, &li); 5334 while ((ln = listNext(&li)) != NULL) { 5335 clusterManagerNode *n = ln->value; 5336 if (n->balance <= 0 && total_balance > 0) { 5337 n->balance--; 5338 total_balance--; 5339 } 5340 } 5341 } 5342 /* Sort nodes by their slots balance. */ 5343 qsort(weightedNodes, nodes_involved, sizeof(clusterManagerNode *), 5344 clusterManagerCompareNodeBalance); 5345 clusterManagerLogInfo(">>> Rebalancing across %d nodes. " 5346 "Total weight = %.2f\n", 5347 nodes_involved, total_weight); 5348 if (config.verbose) { 5349 for (i = 0; i < nodes_involved; i++) { 5350 clusterManagerNode *n = weightedNodes[i]; 5351 printf("%s:%d balance is %d slots\n", n->ip, n->port, n->balance); 5352 } 5353 } 5354 /* Now we have at the start of the 'sn' array nodes that should get 5355 * slots, at the end nodes that must give slots. 5356 * We take two indexes, one at the start, and one at the end, 5357 * incrementing or decrementing the indexes accordingly til we 5358 * find nodes that need to get/provide slots. */ 5359 int dst_idx = 0; 5360 int src_idx = nodes_involved - 1; 5361 int simulate = config.cluster_manager_command.flags & 5362 CLUSTER_MANAGER_CMD_FLAG_SIMULATE; 5363 while (dst_idx < src_idx) { 5364 clusterManagerNode *dst = weightedNodes[dst_idx]; 5365 clusterManagerNode *src = weightedNodes[src_idx]; 5366 int db = abs(dst->balance); 5367 int sb = abs(src->balance); 5368 int numslots = (db < sb ? db : sb); 5369 if (numslots > 0) { 5370 printf("Moving %d slots from %s:%d to %s:%d\n", numslots, 5371 src->ip, 5372 src->port, 5373 dst->ip, 5374 dst->port); 5375 /* Actually move the slots. */ 5376 list *lsrc = listCreate(), *table = NULL; 5377 listAddNodeTail(lsrc, src); 5378 table = clusterManagerComputeReshardTable(lsrc, numslots); 5379 listRelease(lsrc); 5380 int table_len = (int) listLength(table); 5381 if (!table || table_len != numslots) { 5382 clusterManagerLogErr("*** Assertion failed: Reshard table " 5383 "!= number of slots"); 5384 result = 0; 5385 goto end_move; 5386 } 5387 if (simulate) { 5388 for (i = 0; i < table_len; i++) printf("#"); 5389 } else { 5390 int opts = CLUSTER_MANAGER_OPT_QUIET | 5391 CLUSTER_MANAGER_OPT_UPDATE; 5392 listRewind(table, &li); 5393 while ((ln = listNext(&li)) != NULL) { 5394 clusterManagerReshardTableItem *item = ln->value; 5395 result = clusterManagerMoveSlot(item->source, 5396 dst, 5397 item->slot, 5398 opts, NULL); 5399 if (!result) goto end_move; 5400 printf("#"); 5401 fflush(stdout); 5402 } 5403 5404 } 5405 printf("\n"); 5406 end_move: 5407 clusterManagerReleaseReshardTable(table); 5408 if (!result) goto cleanup; 5409 } 5410 /* Update nodes balance. */ 5411 dst->balance += numslots; 5412 src->balance -= numslots; 5413 if (dst->balance == 0) dst_idx++; 5414 if (src->balance == 0) src_idx --; 5415 } 5416 cleanup: 5417 if (involved != NULL) listRelease(involved); 5418 if (weightedNodes != NULL) zfree(weightedNodes); 5419 return result; 5420 invalid_args: 5421 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5422 return 0; 5423 } 5424 5425 static int clusterManagerCommandSetTimeout(int argc, char **argv) { 5426 UNUSED(argc); 5427 int port = 0; 5428 char *ip = NULL; 5429 if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args; 5430 int timeout = atoi(argv[1]); 5431 if (timeout < 100) { 5432 fprintf(stderr, "Setting a node timeout of less than 100 " 5433 "milliseconds is a bad idea.\n"); 5434 return 0; 5435 } 5436 // Load cluster information 5437 clusterManagerNode *node = clusterManagerNewNode(ip, port); 5438 if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; 5439 int ok_count = 0, err_count = 0; 5440 5441 clusterManagerLogInfo(">>> Reconfiguring node timeout in every " 5442 "cluster node...\n"); 5443 listIter li; 5444 listNode *ln; 5445 listRewind(cluster_manager.nodes, &li); 5446 while ((ln = listNext(&li)) != NULL) { 5447 clusterManagerNode *n = ln->value; 5448 char *err = NULL; 5449 redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CONFIG %s %s %d", 5450 "SET", 5451 "cluster-node-timeout", 5452 timeout); 5453 if (reply == NULL) goto reply_err; 5454 int ok = clusterManagerCheckRedisReply(n, reply, &err); 5455 freeReplyObject(reply); 5456 if (!ok) goto reply_err; 5457 reply = CLUSTER_MANAGER_COMMAND(n, "CONFIG %s", "REWRITE"); 5458 if (reply == NULL) goto reply_err; 5459 ok = clusterManagerCheckRedisReply(n, reply, &err); 5460 freeReplyObject(reply); 5461 if (!ok) goto reply_err; 5462 clusterManagerLogWarn("*** New timeout set for %s:%d\n", n->ip, 5463 n->port); 5464 ok_count++; 5465 continue; 5466 reply_err:; 5467 int need_free = 0; 5468 if (err == NULL) err = ""; 5469 else need_free = 1; 5470 clusterManagerLogErr("ERR setting node-timeot for %s:%d: %s\n", n->ip, 5471 n->port, err); 5472 if (need_free) zfree(err); 5473 err_count++; 5474 } 5475 clusterManagerLogInfo(">>> New node timeout set. %d OK, %d ERR.\n", 5476 ok_count, err_count); 5477 return 1; 5478 invalid_args: 5479 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5480 return 0; 5481 } 5482 5483 static int clusterManagerCommandImport(int argc, char **argv) { 5484 int success = 1; 5485 int port = 0, src_port = 0; 5486 char *ip = NULL, *src_ip = NULL; 5487 char *invalid_args_msg = NULL; 5488 if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) { 5489 invalid_args_msg = CLUSTER_MANAGER_INVALID_HOST_ARG; 5490 goto invalid_args; 5491 } 5492 if (config.cluster_manager_command.from == NULL) { 5493 invalid_args_msg = "[ERR] Option '--cluster-from' is required for " 5494 "subcommand 'import'.\n"; 5495 goto invalid_args; 5496 } 5497 char *src_host[] = {config.cluster_manager_command.from}; 5498 if (!getClusterHostFromCmdArgs(1, src_host, &src_ip, &src_port)) { 5499 invalid_args_msg = "[ERR] Invalid --cluster-from host. You need to " 5500 "pass a valid address (ie. 120.0.0.1:7000).\n"; 5501 goto invalid_args; 5502 } 5503 clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n", 5504 src_ip, src_port, ip, port); 5505 5506 clusterManagerNode *refnode = clusterManagerNewNode(ip, port); 5507 if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0; 5508 if (!clusterManagerCheckCluster(0)) return 0; 5509 char *reply_err = NULL; 5510 redisReply *src_reply = NULL; 5511 // Connect to the source node. 5512 redisContext *src_ctx = redisConnect(src_ip, src_port); 5513 if (src_ctx->err) { 5514 success = 0; 5515 fprintf(stderr,"Could not connect to Redis at %s:%d: %s.\n", src_ip, 5516 src_port, src_ctx->errstr); 5517 goto cleanup; 5518 } 5519 src_reply = reconnectingRedisCommand(src_ctx, "INFO"); 5520 if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) { 5521 if (src_reply && src_reply->str) reply_err = src_reply->str; 5522 success = 0; 5523 goto cleanup; 5524 } 5525 if (getLongInfoField(src_reply->str, "cluster_enabled")) { 5526 clusterManagerLogErr("[ERR] The source node should not be a " 5527 "cluster node.\n"); 5528 success = 0; 5529 goto cleanup; 5530 } 5531 freeReplyObject(src_reply); 5532 src_reply = reconnectingRedisCommand(src_ctx, "DBSIZE"); 5533 if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) { 5534 if (src_reply && src_reply->str) reply_err = src_reply->str; 5535 success = 0; 5536 goto cleanup; 5537 } 5538 int size = src_reply->integer, i; 5539 clusterManagerLogWarn("*** Importing %d keys from DB 0\n", size); 5540 5541 // Build a slot -> node map 5542 clusterManagerNode *slots_map[CLUSTER_MANAGER_SLOTS]; 5543 memset(slots_map, 0, sizeof(slots_map)); 5544 listIter li; 5545 listNode *ln; 5546 for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) { 5547 listRewind(cluster_manager.nodes, &li); 5548 while ((ln = listNext(&li)) != NULL) { 5549 clusterManagerNode *n = ln->value; 5550 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; 5551 if (n->slots_count == 0) continue; 5552 if (n->slots[i]) { 5553 slots_map[i] = n; 5554 break; 5555 } 5556 } 5557 } 5558 5559 char cmdfmt[50] = "MIGRATE %s %d %s %d %d"; 5560 if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COPY) 5561 strcat(cmdfmt, " %s"); 5562 if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_REPLACE) 5563 strcat(cmdfmt, " %s"); 5564 5565 /* Use SCAN to iterate over the keys, migrating to the 5566 * right node as needed. */ 5567 int cursor = -999, timeout = config.cluster_manager_command.timeout; 5568 while (cursor != 0) { 5569 if (cursor < 0) cursor = 0; 5570 freeReplyObject(src_reply); 5571 src_reply = reconnectingRedisCommand(src_ctx, "SCAN %d COUNT %d", 5572 cursor, 1000); 5573 if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) { 5574 if (src_reply && src_reply->str) reply_err = src_reply->str; 5575 success = 0; 5576 goto cleanup; 5577 } 5578 assert(src_reply->type == REDIS_REPLY_ARRAY); 5579 assert(src_reply->elements >= 2); 5580 assert(src_reply->element[1]->type == REDIS_REPLY_ARRAY); 5581 if (src_reply->element[0]->type == REDIS_REPLY_STRING) 5582 cursor = atoi(src_reply->element[0]->str); 5583 else if (src_reply->element[0]->type == REDIS_REPLY_INTEGER) 5584 cursor = src_reply->element[0]->integer; 5585 int keycount = src_reply->element[1]->elements; 5586 for (i = 0; i < keycount; i++) { 5587 redisReply *kr = src_reply->element[1]->element[i]; 5588 assert(kr->type == REDIS_REPLY_STRING); 5589 char *key = kr->str; 5590 uint16_t slot = clusterManagerKeyHashSlot(key, kr->len); 5591 clusterManagerNode *target = slots_map[slot]; 5592 printf("Migrating %s to %s:%d: ", key, target->ip, target->port); 5593 redisReply *r = reconnectingRedisCommand(src_ctx, cmdfmt, 5594 target->ip, target->port, 5595 key, 0, timeout, 5596 "COPY", "REPLACE"); 5597 if (!r || r->type == REDIS_REPLY_ERROR) { 5598 if (r && r->str) { 5599 clusterManagerLogErr("Source %s:%d replied with " 5600 "error:\n%s\n", src_ip, src_port, 5601 r->str); 5602 } 5603 success = 0; 5604 } 5605 freeReplyObject(r); 5606 if (!success) goto cleanup; 5607 clusterManagerLogOk("OK\n"); 5608 } 5609 } 5610 cleanup: 5611 if (reply_err) 5612 clusterManagerLogErr("Source %s:%d replied with error:\n%s\n", 5613 src_ip, src_port, reply_err); 5614 if (src_ctx) redisFree(src_ctx); 5615 if (src_reply) freeReplyObject(src_reply); 5616 return success; 5617 invalid_args: 5618 fprintf(stderr, "%s", invalid_args_msg); 5619 return 0; 5620 } 5621 5622 static int clusterManagerCommandCall(int argc, char **argv) { 5623 int port = 0, i; 5624 char *ip = NULL; 5625 if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args; 5626 clusterManagerNode *refnode = clusterManagerNewNode(ip, port); 5627 if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0; 5628 argc--; 5629 argv++; 5630 size_t *argvlen = zmalloc(argc*sizeof(size_t)); 5631 clusterManagerLogInfo(">>> Calling"); 5632 for (i = 0; i < argc; i++) { 5633 argvlen[i] = strlen(argv[i]); 5634 printf(" %s", argv[i]); 5635 } 5636 printf("\n"); 5637 listIter li; 5638 listNode *ln; 5639 listRewind(cluster_manager.nodes, &li); 5640 while ((ln = listNext(&li)) != NULL) { 5641 clusterManagerNode *n = ln->value; 5642 if (!n->context && !clusterManagerNodeConnect(n)) continue; 5643 redisReply *reply = NULL; 5644 redisAppendCommandArgv(n->context, argc, (const char **) argv, argvlen); 5645 int status = redisGetReply(n->context, (void **)(&reply)); 5646 if (status != REDIS_OK || reply == NULL ) 5647 printf("%s:%d: Failed!\n", n->ip, n->port); 5648 else { 5649 sds formatted_reply = cliFormatReplyRaw(reply); 5650 printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply); 5651 sdsfree(formatted_reply); 5652 } 5653 if (reply != NULL) freeReplyObject(reply); 5654 } 5655 zfree(argvlen); 5656 return 1; 5657 invalid_args: 5658 fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); 5659 return 0; 5660 } 5661 5662 static int clusterManagerCommandHelp(int argc, char **argv) { 5663 UNUSED(argc); 5664 UNUSED(argv); 5665 int commands_count = sizeof(clusterManagerCommands) / 5666 sizeof(clusterManagerCommandDef); 5667 int i = 0, j; 5668 fprintf(stderr, "Cluster Manager Commands:\n"); 5669 int padding = 15; 5670 for (; i < commands_count; i++) { 5671 clusterManagerCommandDef *def = &(clusterManagerCommands[i]); 5672 int namelen = strlen(def->name), padlen = padding - namelen; 5673 fprintf(stderr, " %s", def->name); 5674 for (j = 0; j < padlen; j++) fprintf(stderr, " "); 5675 fprintf(stderr, "%s\n", (def->args ? def->args : "")); 5676 if (def->options != NULL) { 5677 int optslen = strlen(def->options); 5678 char *p = def->options, *eos = p + optslen; 5679 char *comma = NULL; 5680 while ((comma = strchr(p, ',')) != NULL) { 5681 int deflen = (int)(comma - p); 5682 char buf[255]; 5683 memcpy(buf, p, deflen); 5684 buf[deflen] = '\0'; 5685 for (j = 0; j < padding; j++) fprintf(stderr, " "); 5686 fprintf(stderr, " --cluster-%s\n", buf); 5687 p = comma + 1; 5688 if (p >= eos) break; 5689 } 5690 if (p < eos) { 5691 for (j = 0; j < padding; j++) fprintf(stderr, " "); 5692 fprintf(stderr, " --cluster-%s\n", p); 5693 } 5694 } 5695 } 5696 fprintf(stderr, "\nFor check, fix, reshard, del-node, set-timeout you " 5697 "can specify the host and port of any working node in " 5698 "the cluster.\n\n"); 5699 return 0; 5700 } 5701 5702 /*------------------------------------------------------------------------------ 5703 * Latency and latency history modes 5704 *--------------------------------------------------------------------------- */ 5705 5706 static void latencyModePrint(long long min, long long max, double avg, long long count) { 5707 if (config.output == OUTPUT_STANDARD) { 5708 printf("min: %lld, max: %lld, avg: %.2f (%lld samples)", 5709 min, max, avg, count); 5710 fflush(stdout); 5711 } else if (config.output == OUTPUT_CSV) { 5712 printf("%lld,%lld,%.2f,%lld\n", min, max, avg, count); 5713 } else if (config.output == OUTPUT_RAW) { 5714 printf("%lld %lld %.2f %lld\n", min, max, avg, count); 5715 } 5716 } 5717 5718 #define LATENCY_SAMPLE_RATE 10 /* milliseconds. */ 5719 #define LATENCY_HISTORY_DEFAULT_INTERVAL 15000 /* milliseconds. */ 5720 static void latencyMode(void) { 5721 redisReply *reply; 5722 long long start, latency, min = 0, max = 0, tot = 0, count = 0; 5723 long long history_interval = 5724 config.interval ? config.interval/1000 : 5725 LATENCY_HISTORY_DEFAULT_INTERVAL; 5726 double avg; 5727 long long history_start = mstime(); 5728 5729 /* Set a default for the interval in case of --latency option 5730 * with --raw, --csv or when it is redirected to non tty. */ 5731 if (config.interval == 0) { 5732 config.interval = 1000; 5733 } else { 5734 config.interval /= 1000; /* We need to convert to milliseconds. */ 5735 } 5736 5737 if (!context) exit(1); 5738 while(1) { 5739 start = mstime(); 5740 reply = reconnectingRedisCommand(context,"PING"); 5741 if (reply == NULL) { 5742 fprintf(stderr,"\nI/O error\n"); 5743 exit(1); 5744 } 5745 latency = mstime()-start; 5746 freeReplyObject(reply); 5747 count++; 5748 if (count == 1) { 5749 min = max = tot = latency; 5750 avg = (double) latency; 5751 } else { 5752 if (latency < min) min = latency; 5753 if (latency > max) max = latency; 5754 tot += latency; 5755 avg = (double) tot/count; 5756 } 5757 5758 if (config.output == OUTPUT_STANDARD) { 5759 printf("\x1b[0G\x1b[2K"); /* Clear the line. */ 5760 latencyModePrint(min,max,avg,count); 5761 } else { 5762 if (config.latency_history) { 5763 latencyModePrint(min,max,avg,count); 5764 } else if (mstime()-history_start > config.interval) { 5765 latencyModePrint(min,max,avg,count); 5766 exit(0); 5767 } 5768 } 5769 5770 if (config.latency_history && mstime()-history_start > history_interval) 5771 { 5772 printf(" -- %.2f seconds range\n", (float)(mstime()-history_start)/1000); 5773 history_start = mstime(); 5774 min = max = tot = count = 0; 5775 } 5776 usleep(LATENCY_SAMPLE_RATE * 1000); 5777 } 5778 } 5779 5780 /*------------------------------------------------------------------------------ 5781 * Latency distribution mode -- requires 256 colors xterm 5782 *--------------------------------------------------------------------------- */ 5783 5784 #define LATENCY_DIST_DEFAULT_INTERVAL 1000 /* milliseconds. */ 5785 5786 /* Structure to store samples distribution. */ 5787 struct distsamples { 5788 long long max; /* Max latency to fit into this interval (usec). */ 5789 long long count; /* Number of samples in this interval. */ 5790 int character; /* Associated character in visualization. */ 5791 }; 5792 5793 /* Helper function for latencyDistMode(). Performs the spectrum visualization 5794 * of the collected samples targeting an xterm 256 terminal. 5795 * 5796 * Takes an array of distsamples structures, ordered from smaller to bigger 5797 * 'max' value. Last sample max must be 0, to mean that it olds all the 5798 * samples greater than the previous one, and is also the stop sentinel. 5799 * 5800 * "tot' is the total number of samples in the different buckets, so it 5801 * is the SUM(samples[i].conut) for i to 0 up to the max sample. 5802 * 5803 * As a side effect the function sets all the buckets count to 0. */ 5804 void showLatencyDistSamples(struct distsamples *samples, long long tot) { 5805 int j; 5806 5807 /* We convert samples into a index inside the palette 5808 * proportional to the percentage a given bucket represents. 5809 * This way intensity of the different parts of the spectrum 5810 * don't change relative to the number of requests, which avoids to 5811 * pollute the visualization with non-latency related info. */ 5812 printf("\033[38;5;0m"); /* Set foreground color to black. */ 5813 for (j = 0; ; j++) { 5814 int coloridx = 5815 ceil((float) samples[j].count / tot * (spectrum_palette_size-1)); 5816 int color = spectrum_palette[coloridx]; 5817 printf("\033[48;5;%dm%c", (int)color, samples[j].character); 5818 samples[j].count = 0; 5819 if (samples[j].max == 0) break; /* Last sample. */ 5820 } 5821 printf("\033[0m\n"); 5822 fflush(stdout); 5823 } 5824 5825 /* Show the legend: different buckets values and colors meaning, so 5826 * that the spectrum is more easily readable. */ 5827 void showLatencyDistLegend(void) { 5828 int j; 5829 5830 printf("---------------------------------------------\n"); 5831 printf(". - * # .01 .125 .25 .5 milliseconds\n"); 5832 printf("1,2,3,...,9 from 1 to 9 milliseconds\n"); 5833 printf("A,B,C,D,E 10,20,30,40,50 milliseconds\n"); 5834 printf("F,G,H,I,J .1,.2,.3,.4,.5 seconds\n"); 5835 printf("K,L,M,N,O,P,Q,? 1,2,4,8,16,30,60,>60 seconds\n"); 5836 printf("From 0 to 100%%: "); 5837 for (j = 0; j < spectrum_palette_size; j++) { 5838 printf("\033[48;5;%dm ", spectrum_palette[j]); 5839 } 5840 printf("\033[0m\n"); 5841 printf("---------------------------------------------\n"); 5842 } 5843 5844 static void latencyDistMode(void) { 5845 redisReply *reply; 5846 long long start, latency, count = 0; 5847 long long history_interval = 5848 config.interval ? config.interval/1000 : 5849 LATENCY_DIST_DEFAULT_INTERVAL; 5850 long long history_start = ustime(); 5851 int j, outputs = 0; 5852 5853 struct distsamples samples[] = { 5854 /* We use a mostly logarithmic scale, with certain linear intervals 5855 * which are more interesting than others, like 1-10 milliseconds 5856 * range. */ 5857 {10,0,'.'}, /* 0.01 ms */ 5858 {125,0,'-'}, /* 0.125 ms */ 5859 {250,0,'*'}, /* 0.25 ms */ 5860 {500,0,'#'}, /* 0.5 ms */ 5861 {1000,0,'1'}, /* 1 ms */ 5862 {2000,0,'2'}, /* 2 ms */ 5863 {3000,0,'3'}, /* 3 ms */ 5864 {4000,0,'4'}, /* 4 ms */ 5865 {5000,0,'5'}, /* 5 ms */ 5866 {6000,0,'6'}, /* 6 ms */ 5867 {7000,0,'7'}, /* 7 ms */ 5868 {8000,0,'8'}, /* 8 ms */ 5869 {9000,0,'9'}, /* 9 ms */ 5870 {10000,0,'A'}, /* 10 ms */ 5871 {20000,0,'B'}, /* 20 ms */ 5872 {30000,0,'C'}, /* 30 ms */ 5873 {40000,0,'D'}, /* 40 ms */ 5874 {50000,0,'E'}, /* 50 ms */ 5875 {100000,0,'F'}, /* 0.1 s */ 5876 {200000,0,'G'}, /* 0.2 s */ 5877 {300000,0,'H'}, /* 0.3 s */ 5878 {400000,0,'I'}, /* 0.4 s */ 5879 {500000,0,'J'}, /* 0.5 s */ 5880 {1000000,0,'K'}, /* 1 s */ 5881 {2000000,0,'L'}, /* 2 s */ 5882 {4000000,0,'M'}, /* 4 s */ 5883 {8000000,0,'N'}, /* 8 s */ 5884 {16000000,0,'O'}, /* 16 s */ 5885 {30000000,0,'P'}, /* 30 s */ 5886 {60000000,0,'Q'}, /* 1 minute */ 5887 {0,0,'?'}, /* > 1 minute */ 5888 }; 5889 5890 if (!context) exit(1); 5891 while(1) { 5892 start = ustime(); 5893 reply = reconnectingRedisCommand(context,"PING"); 5894 if (reply == NULL) { 5895 fprintf(stderr,"\nI/O error\n"); 5896 exit(1); 5897 } 5898 latency = ustime()-start; 5899 freeReplyObject(reply); 5900 count++; 5901 5902 /* Populate the relevant bucket. */ 5903 for (j = 0; ; j++) { 5904 if (samples[j].max == 0 || latency <= samples[j].max) { 5905 samples[j].count++; 5906 break; 5907 } 5908 } 5909 5910 /* From time to time show the spectrum. */ 5911 if (count && (ustime()-history_start)/1000 > history_interval) { 5912 if ((outputs++ % 20) == 0) 5913 showLatencyDistLegend(); 5914 showLatencyDistSamples(samples,count); 5915 history_start = ustime(); 5916 count = 0; 5917 } 5918 usleep(LATENCY_SAMPLE_RATE * 1000); 5919 } 5920 } 5921 5922 /*------------------------------------------------------------------------------ 5923 * Slave mode 5924 *--------------------------------------------------------------------------- */ 5925 5926 /* Sends SYNC and reads the number of bytes in the payload. Used both by 5927 * slaveMode() and getRDB(). */ 5928 unsigned long long sendSync(int fd) { 5929 /* To start we need to send the SYNC command and return the payload. 5930 * The hiredis client lib does not understand this part of the protocol 5931 * and we don't want to mess with its buffers, so everything is performed 5932 * using direct low-level I/O. */ 5933 char buf[4096], *p; 5934 ssize_t nread; 5935 5936 /* Send the SYNC command. */ 5937 if (write(fd,"SYNC\r\n",6) != 6) { 5938 fprintf(stderr,"Error writing to master\n"); 5939 exit(1); 5940 } 5941 5942 /* Read $<payload>\r\n, making sure to read just up to "\n" */ 5943 p = buf; 5944 while(1) { 5945 nread = read(fd,p,1); 5946 if (nread <= 0) { 5947 fprintf(stderr,"Error reading bulk length while SYNCing\n"); 5948 exit(1); 5949 } 5950 if (*p == '\n' && p != buf) break; 5951 if (*p != '\n') p++; 5952 } 5953 *p = '\0'; 5954 if (buf[0] == '-') { 5955 printf("SYNC with master failed: %s\n", buf); 5956 exit(1); 5957 } 5958 return strtoull(buf+1,NULL,10); 5959 } 5960 5961 static void slaveMode(void) { 5962 int fd = context->fd; 5963 unsigned long long payload = sendSync(fd); 5964 char buf[1024]; 5965 int original_output = config.output; 5966 5967 fprintf(stderr,"SYNC with master, discarding %llu " 5968 "bytes of bulk transfer...\n", payload); 5969 5970 /* Discard the payload. */ 5971 while(payload) { 5972 ssize_t nread; 5973 5974 nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); 5975 if (nread <= 0) { 5976 fprintf(stderr,"Error reading RDB payload while SYNCing\n"); 5977 exit(1); 5978 } 5979 payload -= nread; 5980 } 5981 fprintf(stderr,"SYNC done. Logging commands from master.\n"); 5982 5983 /* Now we can use hiredis to read the incoming protocol. */ 5984 config.output = OUTPUT_CSV; 5985 while (cliReadReply(0) == REDIS_OK); 5986 config.output = original_output; 5987 } 5988 5989 /*------------------------------------------------------------------------------ 5990 * RDB transfer mode 5991 *--------------------------------------------------------------------------- */ 5992 5993 /* This function implements --rdb, so it uses the replication protocol in order 5994 * to fetch the RDB file from a remote server. */ 5995 static void getRDB(void) { 5996 int s = context->fd; 5997 int fd; 5998 unsigned long long payload = sendSync(s); 5999 char buf[4096]; 6000 6001 fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n", 6002 payload, config.rdb_filename); 6003 6004 /* Write to file. */ 6005 if (!strcmp(config.rdb_filename,"-")) { 6006 fd = STDOUT_FILENO; 6007 } else { 6008 fd = open(config.rdb_filename, O_CREAT|O_WRONLY, 0644); 6009 if (fd == -1) { 6010 fprintf(stderr, "Error opening '%s': %s\n", config.rdb_filename, 6011 strerror(errno)); 6012 exit(1); 6013 } 6014 } 6015 6016 while(payload) { 6017 ssize_t nread, nwritten; 6018 6019 nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); 6020 if (nread <= 0) { 6021 fprintf(stderr,"I/O Error reading RDB payload from socket\n"); 6022 exit(1); 6023 } 6024 nwritten = write(fd, buf, nread); 6025 if (nwritten != nread) { 6026 fprintf(stderr,"Error writing data to file: %s\n", 6027 (nwritten == -1) ? strerror(errno) : "short write"); 6028 exit(1); 6029 } 6030 payload -= nread; 6031 } 6032 close(s); /* Close the file descriptor ASAP as fsync() may take time. */ 6033 fsync(fd); 6034 close(fd); 6035 fprintf(stderr,"Transfer finished with success.\n"); 6036 exit(0); 6037 } 6038 6039 /*------------------------------------------------------------------------------ 6040 * Bulk import (pipe) mode 6041 *--------------------------------------------------------------------------- */ 6042 6043 #define PIPEMODE_WRITE_LOOP_MAX_BYTES (128*1024) 6044 static void pipeMode(void) { 6045 int fd = context->fd; 6046 long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0; 6047 char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */ 6048 char aneterr[ANET_ERR_LEN]; 6049 redisReader *reader = redisReaderCreate(); 6050 redisReply *reply; 6051 int eof = 0; /* True once we consumed all the standard input. */ 6052 int done = 0; 6053 char magic[20]; /* Special reply we recognize. */ 6054 time_t last_read_time = time(NULL); 6055 6056 srand(time(NULL)); 6057 6058 /* Use non blocking I/O. */ 6059 if (anetNonBlock(aneterr,fd) == ANET_ERR) { 6060 fprintf(stderr, "Can't set the socket in non blocking mode: %s\n", 6061 aneterr); 6062 exit(1); 6063 } 6064 6065 /* Transfer raw protocol and read replies from the server at the same 6066 * time. */ 6067 while(!done) { 6068 int mask = AE_READABLE; 6069 6070 if (!eof || obuf_len != 0) mask |= AE_WRITABLE; 6071 mask = aeWait(fd,mask,1000); 6072 6073 /* Handle the readable state: we can read replies from the server. */ 6074 if (mask & AE_READABLE) { 6075 ssize_t nread; 6076 6077 /* Read from socket and feed the hiredis reader. */ 6078 do { 6079 nread = read(fd,ibuf,sizeof(ibuf)); 6080 if (nread == -1 && errno != EAGAIN && errno != EINTR) { 6081 fprintf(stderr, "Error reading from the server: %s\n", 6082 strerror(errno)); 6083 exit(1); 6084 } 6085 if (nread > 0) { 6086 redisReaderFeed(reader,ibuf,nread); 6087 last_read_time = time(NULL); 6088 } 6089 } while(nread > 0); 6090 6091 /* Consume replies. */ 6092 do { 6093 if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) { 6094 fprintf(stderr, "Error reading replies from server\n"); 6095 exit(1); 6096 } 6097 if (reply) { 6098 if (reply->type == REDIS_REPLY_ERROR) { 6099 fprintf(stderr,"%s\n", reply->str); 6100 errors++; 6101 } else if (eof && reply->type == REDIS_REPLY_STRING && 6102 reply->len == 20) { 6103 /* Check if this is the reply to our final ECHO 6104 * command. If so everything was received 6105 * from the server. */ 6106 if (memcmp(reply->str,magic,20) == 0) { 6107 printf("Last reply received from server.\n"); 6108 done = 1; 6109 replies--; 6110 } 6111 } 6112 replies++; 6113 freeReplyObject(reply); 6114 } 6115 } while(reply); 6116 } 6117 6118 /* Handle the writable state: we can send protocol to the server. */ 6119 if (mask & AE_WRITABLE) { 6120 ssize_t loop_nwritten = 0; 6121 6122 while(1) { 6123 /* Transfer current buffer to server. */ 6124 if (obuf_len != 0) { 6125 ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len); 6126 6127 if (nwritten == -1) { 6128 if (errno != EAGAIN && errno != EINTR) { 6129 fprintf(stderr, "Error writing to the server: %s\n", 6130 strerror(errno)); 6131 exit(1); 6132 } else { 6133 nwritten = 0; 6134 } 6135 } 6136 obuf_len -= nwritten; 6137 obuf_pos += nwritten; 6138 loop_nwritten += nwritten; 6139 if (obuf_len != 0) break; /* Can't accept more data. */ 6140 } 6141 /* If buffer is empty, load from stdin. */ 6142 if (obuf_len == 0 && !eof) { 6143 ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf)); 6144 6145 if (nread == 0) { 6146 /* The ECHO sequence starts with a "\r\n" so that if there 6147 * is garbage in the protocol we read from stdin, the ECHO 6148 * will likely still be properly formatted. 6149 * CRLF is ignored by Redis, so it has no effects. */ 6150 char echo[] = 6151 "\r\n*2\r\n$4\r\nECHO\r\n$20\r\n01234567890123456789\r\n"; 6152 int j; 6153 6154 eof = 1; 6155 /* Everything transferred, so we queue a special 6156 * ECHO command that we can match in the replies 6157 * to make sure everything was read from the server. */ 6158 for (j = 0; j < 20; j++) 6159 magic[j] = rand() & 0xff; 6160 memcpy(echo+21,magic,20); 6161 memcpy(obuf,echo,sizeof(echo)-1); 6162 obuf_len = sizeof(echo)-1; 6163 obuf_pos = 0; 6164 printf("All data transferred. Waiting for the last reply...\n"); 6165 } else if (nread == -1) { 6166 fprintf(stderr, "Error reading from stdin: %s\n", 6167 strerror(errno)); 6168 exit(1); 6169 } else { 6170 obuf_len = nread; 6171 obuf_pos = 0; 6172 } 6173 } 6174 if ((obuf_len == 0 && eof) || 6175 loop_nwritten > PIPEMODE_WRITE_LOOP_MAX_BYTES) break; 6176 } 6177 } 6178 6179 /* Handle timeout, that is, we reached EOF, and we are not getting 6180 * replies from the server for a few seconds, nor the final ECHO is 6181 * received. */ 6182 if (eof && config.pipe_timeout > 0 && 6183 time(NULL)-last_read_time > config.pipe_timeout) 6184 { 6185 fprintf(stderr,"No replies for %d seconds: exiting.\n", 6186 config.pipe_timeout); 6187 errors++; 6188 break; 6189 } 6190 } 6191 redisReaderFree(reader); 6192 printf("errors: %lld, replies: %lld\n", errors, replies); 6193 if (errors) 6194 exit(1); 6195 else 6196 exit(0); 6197 } 6198 6199 /*------------------------------------------------------------------------------ 6200 * Find big keys 6201 *--------------------------------------------------------------------------- */ 6202 6203 static redisReply *sendScan(unsigned long long *it) { 6204 redisReply *reply = redisCommand(context, "SCAN %llu", *it); 6205 6206 /* Handle any error conditions */ 6207 if(reply == NULL) { 6208 fprintf(stderr, "\nI/O error\n"); 6209 exit(1); 6210 } else if(reply->type == REDIS_REPLY_ERROR) { 6211 fprintf(stderr, "SCAN error: %s\n", reply->str); 6212 exit(1); 6213 } else if(reply->type != REDIS_REPLY_ARRAY) { 6214 fprintf(stderr, "Non ARRAY response from SCAN!\n"); 6215 exit(1); 6216 } else if(reply->elements != 2) { 6217 fprintf(stderr, "Invalid element count from SCAN!\n"); 6218 exit(1); 6219 } 6220 6221 /* Validate our types are correct */ 6222 assert(reply->element[0]->type == REDIS_REPLY_STRING); 6223 assert(reply->element[1]->type == REDIS_REPLY_ARRAY); 6224 6225 /* Update iterator */ 6226 *it = strtoull(reply->element[0]->str, NULL, 10); 6227 6228 return reply; 6229 } 6230 6231 static int getDbSize(void) { 6232 redisReply *reply; 6233 int size; 6234 6235 reply = redisCommand(context, "DBSIZE"); 6236 6237 if(reply == NULL || reply->type != REDIS_REPLY_INTEGER) { 6238 fprintf(stderr, "Couldn't determine DBSIZE!\n"); 6239 exit(1); 6240 } 6241 6242 /* Grab the number of keys and free our reply */ 6243 size = reply->integer; 6244 freeReplyObject(reply); 6245 6246 return size; 6247 } 6248 6249 typedef struct { 6250 char *name; 6251 char *sizecmd; 6252 char *sizeunit; 6253 unsigned long long biggest; 6254 unsigned long long count; 6255 unsigned long long totalsize; 6256 sds biggest_key; 6257 } typeinfo; 6258 6259 typeinfo type_string = { "string", "STRLEN", "bytes" }; 6260 typeinfo type_list = { "list", "LLEN", "items" }; 6261 typeinfo type_set = { "set", "SCARD", "members" }; 6262 typeinfo type_hash = { "hash", "HLEN", "fields" }; 6263 typeinfo type_zset = { "zset", "ZCARD", "members" }; 6264 typeinfo type_stream = { "stream", "XLEN", "entries" }; 6265 typeinfo type_other = { "other", NULL, "?" }; 6266 6267 static typeinfo* typeinfo_add(dict *types, char* name, typeinfo* type_template) { 6268 typeinfo *info = zmalloc(sizeof(typeinfo)); 6269 *info = *type_template; 6270 info->name = sdsnew(name); 6271 dictAdd(types, info->name, info); 6272 return info; 6273 } 6274 6275 void type_free(void* priv_data, void* val) { 6276 typeinfo *info = val; 6277 UNUSED(priv_data); 6278 if (info->biggest_key) 6279 sdsfree(info->biggest_key); 6280 sdsfree(info->name); 6281 zfree(info); 6282 } 6283 6284 static dictType typeinfoDictType = { 6285 dictSdsHash, /* hash function */ 6286 NULL, /* key dup */ 6287 NULL, /* val dup */ 6288 dictSdsKeyCompare, /* key compare */ 6289 NULL, /* key destructor (owned by the value)*/ 6290 type_free /* val destructor */ 6291 }; 6292 6293 static void getKeyTypes(dict *types_dict, redisReply *keys, typeinfo **types) { 6294 redisReply *reply; 6295 unsigned int i; 6296 6297 /* Pipeline TYPE commands */ 6298 for(i=0;i<keys->elements;i++) { 6299 redisAppendCommand(context, "TYPE %s", keys->element[i]->str); 6300 } 6301 6302 /* Retrieve types */ 6303 for(i=0;i<keys->elements;i++) { 6304 if(redisGetReply(context, (void**)&reply)!=REDIS_OK) { 6305 fprintf(stderr, "Error getting type for key '%s' (%d: %s)\n", 6306 keys->element[i]->str, context->err, context->errstr); 6307 exit(1); 6308 } else if(reply->type != REDIS_REPLY_STATUS) { 6309 if(reply->type == REDIS_REPLY_ERROR) { 6310 fprintf(stderr, "TYPE returned an error: %s\n", reply->str); 6311 } else { 6312 fprintf(stderr, 6313 "Invalid reply type (%d) for TYPE on key '%s'!\n", 6314 reply->type, keys->element[i]->str); 6315 } 6316 exit(1); 6317 } 6318 6319 sds typereply = sdsnew(reply->str); 6320 dictEntry *de = dictFind(types_dict, typereply); 6321 sdsfree(typereply); 6322 typeinfo *type = NULL; 6323 if (de) 6324 type = dictGetVal(de); 6325 else if (strcmp(reply->str, "none")) /* create new types for modules, (but not for deleted keys) */ 6326 type = typeinfo_add(types_dict, reply->str, &type_other); 6327 types[i] = type; 6328 freeReplyObject(reply); 6329 } 6330 } 6331 6332 static void getKeySizes(redisReply *keys, typeinfo **types, 6333 unsigned long long *sizes, int memkeys, 6334 unsigned memkeys_samples) 6335 { 6336 redisReply *reply; 6337 unsigned int i; 6338 6339 /* Pipeline size commands */ 6340 for(i=0;i<keys->elements;i++) { 6341 /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */ 6342 if(!types[i] || (!types[i]->sizecmd && !memkeys)) 6343 continue; 6344 6345 if (!memkeys) 6346 redisAppendCommand(context, "%s %s", 6347 types[i]->sizecmd, keys->element[i]->str); 6348 else if (memkeys_samples==0) 6349 redisAppendCommand(context, "%s %s %s", 6350 "MEMORY", "USAGE", keys->element[i]->str); 6351 else 6352 redisAppendCommand(context, "%s %s %s SAMPLES %u", 6353 "MEMORY", "USAGE", keys->element[i]->str, memkeys_samples); 6354 } 6355 6356 /* Retrieve sizes */ 6357 for(i=0;i<keys->elements;i++) { 6358 /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */ 6359 if(!types[i] || (!types[i]->sizecmd && !memkeys)) { 6360 sizes[i] = 0; 6361 continue; 6362 } 6363 6364 /* Retrieve size */ 6365 if(redisGetReply(context, (void**)&reply)!=REDIS_OK) { 6366 fprintf(stderr, "Error getting size for key '%s' (%d: %s)\n", 6367 keys->element[i]->str, context->err, context->errstr); 6368 exit(1); 6369 } else if(reply->type != REDIS_REPLY_INTEGER) { 6370 /* Theoretically the key could have been removed and 6371 * added as a different type between TYPE and SIZE */ 6372 fprintf(stderr, 6373 "Warning: %s on '%s' failed (may have changed type)\n", 6374 !memkeys? types[i]->sizecmd: "MEMORY USAGE", 6375 keys->element[i]->str); 6376 sizes[i] = 0; 6377 } else { 6378 sizes[i] = reply->integer; 6379 } 6380 6381 freeReplyObject(reply); 6382 } 6383 } 6384 6385 static void findBigKeys(int memkeys, unsigned memkeys_samples) { 6386 unsigned long long sampled = 0, total_keys, totlen=0, *sizes=NULL, it=0; 6387 redisReply *reply, *keys; 6388 unsigned int arrsize=0, i; 6389 dictIterator *di; 6390 dictEntry *de; 6391 typeinfo **types = NULL; 6392 double pct; 6393 6394 dict *types_dict = dictCreate(&typeinfoDictType, NULL); 6395 typeinfo_add(types_dict, "string", &type_string); 6396 typeinfo_add(types_dict, "list", &type_list); 6397 typeinfo_add(types_dict, "set", &type_set); 6398 typeinfo_add(types_dict, "hash", &type_hash); 6399 typeinfo_add(types_dict, "zset", &type_zset); 6400 typeinfo_add(types_dict, "stream", &type_stream); 6401 6402 /* Total keys pre scanning */ 6403 total_keys = getDbSize(); 6404 6405 /* Status message */ 6406 printf("\n# Scanning the entire keyspace to find biggest keys as well as\n"); 6407 printf("# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec\n"); 6408 printf("# per 100 SCAN commands (not usually needed).\n\n"); 6409 6410 /* SCAN loop */ 6411 do { 6412 /* Calculate approximate percentage completion */ 6413 pct = 100 * (double)sampled/total_keys; 6414 6415 /* Grab some keys and point to the keys array */ 6416 reply = sendScan(&it); 6417 keys = reply->element[1]; 6418 6419 /* Reallocate our type and size array if we need to */ 6420 if(keys->elements > arrsize) { 6421 types = zrealloc(types, sizeof(typeinfo*)*keys->elements); 6422 sizes = zrealloc(sizes, sizeof(unsigned long long)*keys->elements); 6423 6424 if(!types || !sizes) { 6425 fprintf(stderr, "Failed to allocate storage for keys!\n"); 6426 exit(1); 6427 } 6428 6429 arrsize = keys->elements; 6430 } 6431 6432 /* Retrieve types and then sizes */ 6433 getKeyTypes(types_dict, keys, types); 6434 getKeySizes(keys, types, sizes, memkeys, memkeys_samples); 6435 6436 /* Now update our stats */ 6437 for(i=0;i<keys->elements;i++) { 6438 typeinfo *type = types[i]; 6439 /* Skip keys that disappeared between SCAN and TYPE */ 6440 if(!type) 6441 continue; 6442 6443 type->totalsize += sizes[i]; 6444 type->count++; 6445 totlen += keys->element[i]->len; 6446 sampled++; 6447 6448 if(type->biggest<sizes[i]) { 6449 printf( 6450 "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n", 6451 pct, type->name, keys->element[i]->str, sizes[i], 6452 !memkeys? type->sizeunit: "bytes"); 6453 6454 /* Keep track of biggest key name for this type */ 6455 if (type->biggest_key) 6456 sdsfree(type->biggest_key); 6457 type->biggest_key = sdsnew(keys->element[i]->str); 6458 if(!type->biggest_key) { 6459 fprintf(stderr, "Failed to allocate memory for key!\n"); 6460 exit(1); 6461 } 6462 6463 /* Keep track of the biggest size for this type */ 6464 type->biggest = sizes[i]; 6465 } 6466 6467 /* Update overall progress */ 6468 if(sampled % 1000000 == 0) { 6469 printf("[%05.2f%%] Sampled %llu keys so far\n", pct, sampled); 6470 } 6471 } 6472 6473 /* Sleep if we've been directed to do so */ 6474 if(sampled && (sampled %100) == 0 && config.interval) { 6475 usleep(config.interval); 6476 } 6477 6478 freeReplyObject(reply); 6479 } while(it != 0); 6480 6481 if(types) zfree(types); 6482 if(sizes) zfree(sizes); 6483 6484 /* We're done */ 6485 printf("\n-------- summary -------\n\n"); 6486 6487 printf("Sampled %llu keys in the keyspace!\n", sampled); 6488 printf("Total key length in bytes is %llu (avg len %.2f)\n\n", 6489 totlen, totlen ? (double)totlen/sampled : 0); 6490 6491 /* Output the biggest keys we found, for types we did find */ 6492 di = dictGetIterator(types_dict); 6493 while ((de = dictNext(di))) { 6494 typeinfo *type = dictGetVal(de); 6495 if(type->biggest_key) { 6496 printf("Biggest %6s found '%s' has %llu %s\n", type->name, type->biggest_key, 6497 type->biggest, !memkeys? type->sizeunit: "bytes"); 6498 } 6499 } 6500 dictReleaseIterator(di); 6501 6502 printf("\n"); 6503 6504 di = dictGetIterator(types_dict); 6505 while ((de = dictNext(di))) { 6506 typeinfo *type = dictGetVal(de); 6507 printf("%llu %ss with %llu %s (%05.2f%% of keys, avg size %.2f)\n", 6508 type->count, type->name, type->totalsize, !memkeys? type->sizeunit: "bytes", 6509 sampled ? 100 * (double)type->count/sampled : 0, 6510 type->count ? (double)type->totalsize/type->count : 0); 6511 } 6512 dictReleaseIterator(di); 6513 6514 dictRelease(types_dict); 6515 6516 /* Success! */ 6517 exit(0); 6518 } 6519 6520 static void getKeyFreqs(redisReply *keys, unsigned long long *freqs) { 6521 redisReply *reply; 6522 unsigned int i; 6523 6524 /* Pipeline OBJECT freq commands */ 6525 for(i=0;i<keys->elements;i++) { 6526 redisAppendCommand(context, "OBJECT freq %s", keys->element[i]->str); 6527 } 6528 6529 /* Retrieve freqs */ 6530 for(i=0;i<keys->elements;i++) { 6531 if(redisGetReply(context, (void**)&reply)!=REDIS_OK) { 6532 fprintf(stderr, "Error getting freq for key '%s' (%d: %s)\n", 6533 keys->element[i]->str, context->err, context->errstr); 6534 exit(1); 6535 } else if(reply->type != REDIS_REPLY_INTEGER) { 6536 if(reply->type == REDIS_REPLY_ERROR) { 6537 fprintf(stderr, "Error: %s\n", reply->str); 6538 exit(1); 6539 } else { 6540 fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keys->element[i]->str); 6541 freqs[i] = 0; 6542 } 6543 } else { 6544 freqs[i] = reply->integer; 6545 } 6546 freeReplyObject(reply); 6547 } 6548 } 6549 6550 #define HOTKEYS_SAMPLE 16 6551 static void findHotKeys(void) { 6552 redisReply *keys, *reply; 6553 unsigned long long counters[HOTKEYS_SAMPLE] = {0}; 6554 sds hotkeys[HOTKEYS_SAMPLE] = {NULL}; 6555 unsigned long long sampled = 0, total_keys, *freqs = NULL, it = 0; 6556 unsigned int arrsize = 0, i, k; 6557 double pct; 6558 6559 /* Total keys pre scanning */ 6560 total_keys = getDbSize(); 6561 6562 /* Status message */ 6563 printf("\n# Scanning the entire keyspace to find hot keys as well as\n"); 6564 printf("# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec\n"); 6565 printf("# per 100 SCAN commands (not usually needed).\n\n"); 6566 6567 /* SCAN loop */ 6568 do { 6569 /* Calculate approximate percentage completion */ 6570 pct = 100 * (double)sampled/total_keys; 6571 6572 /* Grab some keys and point to the keys array */ 6573 reply = sendScan(&it); 6574 keys = reply->element[1]; 6575 6576 /* Reallocate our freqs array if we need to */ 6577 if(keys->elements > arrsize) { 6578 freqs = zrealloc(freqs, sizeof(unsigned long long)*keys->elements); 6579 6580 if(!freqs) { 6581 fprintf(stderr, "Failed to allocate storage for keys!\n"); 6582 exit(1); 6583 } 6584 6585 arrsize = keys->elements; 6586 } 6587 6588 getKeyFreqs(keys, freqs); 6589 6590 /* Now update our stats */ 6591 for(i=0;i<keys->elements;i++) { 6592 sampled++; 6593 /* Update overall progress */ 6594 if(sampled % 1000000 == 0) { 6595 printf("[%05.2f%%] Sampled %llu keys so far\n", pct, sampled); 6596 } 6597 6598 /* Use eviction pool here */ 6599 k = 0; 6600 while (k < HOTKEYS_SAMPLE && freqs[i] > counters[k]) k++; 6601 if (k == 0) continue; 6602 k--; 6603 if (k == 0 || counters[k] == 0) { 6604 sdsfree(hotkeys[k]); 6605 } else { 6606 sdsfree(hotkeys[0]); 6607 memmove(counters,counters+1,sizeof(counters[0])*k); 6608 memmove(hotkeys,hotkeys+1,sizeof(hotkeys[0])*k); 6609 } 6610 counters[k] = freqs[i]; 6611 hotkeys[k] = sdsnew(keys->element[i]->str); 6612 printf( 6613 "[%05.2f%%] Hot key '%s' found so far with counter %llu\n", 6614 pct, keys->element[i]->str, freqs[i]); 6615 } 6616 6617 /* Sleep if we've been directed to do so */ 6618 if(sampled && (sampled %100) == 0 && config.interval) { 6619 usleep(config.interval); 6620 } 6621 6622 freeReplyObject(reply); 6623 } while(it != 0); 6624 6625 if (freqs) zfree(freqs); 6626 6627 /* We're done */ 6628 printf("\n-------- summary -------\n\n"); 6629 6630 printf("Sampled %llu keys in the keyspace!\n", sampled); 6631 6632 for (i=1; i<= HOTKEYS_SAMPLE; i++) { 6633 k = HOTKEYS_SAMPLE - i; 6634 if(counters[k]>0) { 6635 printf("hot key found with counter: %llu\tkeyname: %s\n", counters[k], hotkeys[k]); 6636 sdsfree(hotkeys[k]); 6637 } 6638 } 6639 6640 exit(0); 6641 } 6642 6643 /*------------------------------------------------------------------------------ 6644 * Stats mode 6645 *--------------------------------------------------------------------------- */ 6646 6647 /* Return the specified INFO field from the INFO command output "info". 6648 * A new buffer is allocated for the result, that needs to be free'd. 6649 * If the field is not found NULL is returned. */ 6650 static char *getInfoField(char *info, char *field) { 6651 char *p = strstr(info,field); 6652 char *n1, *n2; 6653 char *result; 6654 6655 if (!p) return NULL; 6656 p += strlen(field)+1; 6657 n1 = strchr(p,'\r'); 6658 n2 = strchr(p,','); 6659 if (n2 && n2 < n1) n1 = n2; 6660 result = zmalloc(sizeof(char)*(n1-p)+1); 6661 memcpy(result,p,(n1-p)); 6662 result[n1-p] = '\0'; 6663 return result; 6664 } 6665 6666 /* Like the above function but automatically convert the result into 6667 * a long. On error (missing field) LONG_MIN is returned. */ 6668 static long getLongInfoField(char *info, char *field) { 6669 char *value = getInfoField(info,field); 6670 long l; 6671 6672 if (!value) return LONG_MIN; 6673 l = strtol(value,NULL,10); 6674 zfree(value); 6675 return l; 6676 } 6677 6678 /* Convert number of bytes into a human readable string of the form: 6679 * 100B, 2G, 100M, 4K, and so forth. */ 6680 void bytesToHuman(char *s, long long n) { 6681 double d; 6682 6683 if (n < 0) { 6684 *s = '-'; 6685 s++; 6686 n = -n; 6687 } 6688 if (n < 1024) { 6689 /* Bytes */ 6690 sprintf(s,"%lldB",n); 6691 return; 6692 } else if (n < (1024*1024)) { 6693 d = (double)n/(1024); 6694 sprintf(s,"%.2fK",d); 6695 } else if (n < (1024LL*1024*1024)) { 6696 d = (double)n/(1024*1024); 6697 sprintf(s,"%.2fM",d); 6698 } else if (n < (1024LL*1024*1024*1024)) { 6699 d = (double)n/(1024LL*1024*1024); 6700 sprintf(s,"%.2fG",d); 6701 } 6702 } 6703 6704 static void statMode(void) { 6705 redisReply *reply; 6706 long aux, requests = 0; 6707 int i = 0; 6708 6709 while(1) { 6710 char buf[64]; 6711 int j; 6712 6713 reply = reconnectingRedisCommand(context,"INFO"); 6714 if (reply->type == REDIS_REPLY_ERROR) { 6715 printf("ERROR: %s\n", reply->str); 6716 exit(1); 6717 } 6718 6719 if ((i++ % 20) == 0) { 6720 printf( 6721 "------- data ------ --------------------- load -------------------- - child -\n" 6722 "keys mem clients blocked requests connections \n"); 6723 } 6724 6725 /* Keys */ 6726 aux = 0; 6727 for (j = 0; j < 20; j++) { 6728 long k; 6729 6730 sprintf(buf,"db%d:keys",j); 6731 k = getLongInfoField(reply->str,buf); 6732 if (k == LONG_MIN) continue; 6733 aux += k; 6734 } 6735 sprintf(buf,"%ld",aux); 6736 printf("%-11s",buf); 6737 6738 /* Used memory */ 6739 aux = getLongInfoField(reply->str,"used_memory"); 6740 bytesToHuman(buf,aux); 6741 printf("%-8s",buf); 6742 6743 /* Clients */ 6744 aux = getLongInfoField(reply->str,"connected_clients"); 6745 sprintf(buf,"%ld",aux); 6746 printf(" %-8s",buf); 6747 6748 /* Blocked (BLPOPPING) Clients */ 6749 aux = getLongInfoField(reply->str,"blocked_clients"); 6750 sprintf(buf,"%ld",aux); 6751 printf("%-8s",buf); 6752 6753 /* Requests */ 6754 aux = getLongInfoField(reply->str,"total_commands_processed"); 6755 sprintf(buf,"%ld (+%ld)",aux,requests == 0 ? 0 : aux-requests); 6756 printf("%-19s",buf); 6757 requests = aux; 6758 6759 /* Connections */ 6760 aux = getLongInfoField(reply->str,"total_connections_received"); 6761 sprintf(buf,"%ld",aux); 6762 printf(" %-12s",buf); 6763 6764 /* Children */ 6765 aux = getLongInfoField(reply->str,"bgsave_in_progress"); 6766 aux |= getLongInfoField(reply->str,"aof_rewrite_in_progress") << 1; 6767 aux |= getLongInfoField(reply->str,"loading") << 2; 6768 switch(aux) { 6769 case 0: break; 6770 case 1: 6771 printf("SAVE"); 6772 break; 6773 case 2: 6774 printf("AOF"); 6775 break; 6776 case 3: 6777 printf("SAVE+AOF"); 6778 break; 6779 case 4: 6780 printf("LOAD"); 6781 break; 6782 } 6783 6784 printf("\n"); 6785 freeReplyObject(reply); 6786 usleep(config.interval); 6787 } 6788 } 6789 6790 /*------------------------------------------------------------------------------ 6791 * Scan mode 6792 *--------------------------------------------------------------------------- */ 6793 6794 static void scanMode(void) { 6795 redisReply *reply; 6796 unsigned long long cur = 0; 6797 6798 do { 6799 if (config.pattern) 6800 reply = redisCommand(context,"SCAN %llu MATCH %s", 6801 cur,config.pattern); 6802 else 6803 reply = redisCommand(context,"SCAN %llu",cur); 6804 if (reply == NULL) { 6805 printf("I/O error\n"); 6806 exit(1); 6807 } else if (reply->type == REDIS_REPLY_ERROR) { 6808 printf("ERROR: %s\n", reply->str); 6809 exit(1); 6810 } else { 6811 unsigned int j; 6812 6813 cur = strtoull(reply->element[0]->str,NULL,10); 6814 for (j = 0; j < reply->element[1]->elements; j++) 6815 printf("%s\n", reply->element[1]->element[j]->str); 6816 } 6817 freeReplyObject(reply); 6818 } while(cur != 0); 6819 6820 exit(0); 6821 } 6822 6823 /*------------------------------------------------------------------------------ 6824 * LRU test mode 6825 *--------------------------------------------------------------------------- */ 6826 6827 /* Return an integer from min to max (both inclusive) using a power-law 6828 * distribution, depending on the value of alpha: the greater the alpha 6829 * the more bias towards lower values. 6830 * 6831 * With alpha = 6.2 the output follows the 80-20 rule where 20% of 6832 * the returned numbers will account for 80% of the frequency. */ 6833 long long powerLawRand(long long min, long long max, double alpha) { 6834 double pl, r; 6835 6836 max += 1; 6837 r = ((double)rand()) / RAND_MAX; 6838 pl = pow( 6839 ((pow(max,alpha+1) - pow(min,alpha+1))*r + pow(min,alpha+1)), 6840 (1.0/(alpha+1))); 6841 return (max-1-(long long)pl)+min; 6842 } 6843 6844 /* Generates a key name among a set of lru_test_sample_size keys, using 6845 * an 80-20 distribution. */ 6846 void LRUTestGenKey(char *buf, size_t buflen) { 6847 snprintf(buf, buflen, "lru:%lld", 6848 powerLawRand(1, config.lru_test_sample_size, 6.2)); 6849 } 6850 6851 #define LRU_CYCLE_PERIOD 1000 /* 1000 milliseconds. */ 6852 #define LRU_CYCLE_PIPELINE_SIZE 250 6853 static void LRUTestMode(void) { 6854 redisReply *reply; 6855 char key[128]; 6856 long long start_cycle; 6857 int j; 6858 6859 srand(time(NULL)^getpid()); 6860 while(1) { 6861 /* Perform cycles of 1 second with 50% writes and 50% reads. 6862 * We use pipelining batching writes / reads N times per cycle in order 6863 * to fill the target instance easily. */ 6864 start_cycle = mstime(); 6865 long long hits = 0, misses = 0; 6866 while(mstime() - start_cycle < 1000) { 6867 /* Write cycle. */ 6868 for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) { 6869 char val[6]; 6870 val[5] = '\0'; 6871 for (int i = 0; i < 5; i++) val[i] = 'A'+rand()%('z'-'A'); 6872 LRUTestGenKey(key,sizeof(key)); 6873 redisAppendCommand(context, "SET %s %s",key,val); 6874 } 6875 for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) 6876 redisGetReply(context, (void**)&reply); 6877 6878 /* Read cycle. */ 6879 for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) { 6880 LRUTestGenKey(key,sizeof(key)); 6881 redisAppendCommand(context, "GET %s",key); 6882 } 6883 for (j = 0; j < LRU_CYCLE_PIPELINE_SIZE; j++) { 6884 if (redisGetReply(context, (void**)&reply) == REDIS_OK) { 6885 switch(reply->type) { 6886 case REDIS_REPLY_ERROR: 6887 printf("%s\n", reply->str); 6888 break; 6889 case REDIS_REPLY_NIL: 6890 misses++; 6891 break; 6892 default: 6893 hits++; 6894 break; 6895 } 6896 } 6897 } 6898 6899 if (context->err) { 6900 fprintf(stderr,"I/O error during LRU test\n"); 6901 exit(1); 6902 } 6903 } 6904 /* Print stats. */ 6905 printf( 6906 "%lld Gets/sec | Hits: %lld (%.2f%%) | Misses: %lld (%.2f%%)\n", 6907 hits+misses, 6908 hits, (double)hits/(hits+misses)*100, 6909 misses, (double)misses/(hits+misses)*100); 6910 } 6911 exit(0); 6912 } 6913 6914 /*------------------------------------------------------------------------------ 6915 * Intrisic latency mode. 6916 * 6917 * Measure max latency of a running process that does not result from 6918 * syscalls. Basically this software should provide an hint about how much 6919 * time the kernel leaves the process without a chance to run. 6920 *--------------------------------------------------------------------------- */ 6921 6922 /* This is just some computation the compiler can't optimize out. 6923 * Should run in less than 100-200 microseconds even using very 6924 * slow hardware. Runs in less than 10 microseconds in modern HW. */ 6925 unsigned long compute_something_fast(void) { 6926 unsigned char s[256], i, j, t; 6927 int count = 1000, k; 6928 unsigned long output = 0; 6929 6930 for (k = 0; k < 256; k++) s[k] = k; 6931 6932 i = 0; 6933 j = 0; 6934 while(count--) { 6935 i++; 6936 j = j + s[i]; 6937 t = s[i]; 6938 s[i] = s[j]; 6939 s[j] = t; 6940 output += s[(s[i]+s[j])&255]; 6941 } 6942 return output; 6943 } 6944 6945 static void intrinsicLatencyModeStop(int s) { 6946 UNUSED(s); 6947 force_cancel_loop = 1; 6948 } 6949 6950 static void intrinsicLatencyMode(void) { 6951 long long test_end, run_time, max_latency = 0, runs = 0; 6952 6953 run_time = config.intrinsic_latency_duration*1000000; 6954 test_end = ustime() + run_time; 6955 signal(SIGINT, intrinsicLatencyModeStop); 6956 6957 while(1) { 6958 long long start, end, latency; 6959 6960 start = ustime(); 6961 compute_something_fast(); 6962 end = ustime(); 6963 latency = end-start; 6964 runs++; 6965 if (latency <= 0) continue; 6966 6967 /* Reporting */ 6968 if (latency > max_latency) { 6969 max_latency = latency; 6970 printf("Max latency so far: %lld microseconds.\n", max_latency); 6971 } 6972 6973 double avg_us = (double)run_time/runs; 6974 double avg_ns = avg_us * 1e3; 6975 if (force_cancel_loop || end > test_end) { 6976 printf("\n%lld total runs " 6977 "(avg latency: " 6978 "%.4f microseconds / %.2f nanoseconds per run).\n", 6979 runs, avg_us, avg_ns); 6980 printf("Worst run took %.0fx longer than the average latency.\n", 6981 max_latency / avg_us); 6982 exit(0); 6983 } 6984 } 6985 } 6986 6987 /*------------------------------------------------------------------------------ 6988 * Program main() 6989 *--------------------------------------------------------------------------- */ 6990 6991 int main(int argc, char **argv) { 6992 int firstarg; 6993 6994 config.hostip = sdsnew("127.0.0.1"); 6995 config.hostport = 6379; 6996 config.hostsocket = NULL; 6997 config.repeat = 1; 6998 config.interval = 0; 6999 config.dbnum = 0; 7000 config.interactive = 0; 7001 config.shutdown = 0; 7002 config.monitor_mode = 0; 7003 config.pubsub_mode = 0; 7004 config.latency_mode = 0; 7005 config.latency_dist_mode = 0; 7006 config.latency_history = 0; 7007 config.lru_test_mode = 0; 7008 config.lru_test_sample_size = 0; 7009 config.cluster_mode = 0; 7010 config.slave_mode = 0; 7011 config.getrdb_mode = 0; 7012 config.stat_mode = 0; 7013 config.scan_mode = 0; 7014 config.intrinsic_latency_mode = 0; 7015 config.pattern = NULL; 7016 config.rdb_filename = NULL; 7017 config.pipe_mode = 0; 7018 config.pipe_timeout = REDIS_CLI_DEFAULT_PIPE_TIMEOUT; 7019 config.bigkeys = 0; 7020 config.hotkeys = 0; 7021 config.stdinarg = 0; 7022 config.auth = NULL; 7023 config.eval = NULL; 7024 config.eval_ldb = 0; 7025 config.eval_ldb_end = 0; 7026 config.eval_ldb_sync = 0; 7027 config.enable_ldb_on_eval = 0; 7028 config.last_cmd_type = -1; 7029 config.verbose = 0; 7030 config.no_auth_warning = 0; 7031 config.cluster_manager_command.name = NULL; 7032 config.cluster_manager_command.argc = 0; 7033 config.cluster_manager_command.argv = NULL; 7034 config.cluster_manager_command.flags = 0; 7035 config.cluster_manager_command.replicas = 0; 7036 config.cluster_manager_command.from = NULL; 7037 config.cluster_manager_command.to = NULL; 7038 config.cluster_manager_command.weight = NULL; 7039 config.cluster_manager_command.weight_argc = 0; 7040 config.cluster_manager_command.slots = 0; 7041 config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT; 7042 config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE; 7043 config.cluster_manager_command.threshold = 7044 CLUSTER_MANAGER_REBALANCE_THRESHOLD; 7045 pref.hints = 1; 7046 7047 spectrum_palette = spectrum_palette_color; 7048 spectrum_palette_size = spectrum_palette_color_size; 7049 7050 if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL)) 7051 config.output = OUTPUT_RAW; 7052 else 7053 config.output = OUTPUT_STANDARD; 7054 config.mb_delim = sdsnew("\n"); 7055 7056 firstarg = parseOptions(argc,argv); 7057 argc -= firstarg; 7058 argv += firstarg; 7059 7060 parseEnv(); 7061 7062 /* Cluster Manager mode */ 7063 if (CLUSTER_MANAGER_MODE()) { 7064 clusterManagerCommandProc *proc = validateClusterManagerCommand(); 7065 if (!proc) { 7066 sdsfree(config.hostip); 7067 sdsfree(config.mb_delim); 7068 exit(1); 7069 } 7070 clusterManagerMode(proc); 7071 } 7072 7073 /* Latency mode */ 7074 if (config.latency_mode) { 7075 if (cliConnect(0) == REDIS_ERR) exit(1); 7076 latencyMode(); 7077 } 7078 7079 /* Latency distribution mode */ 7080 if (config.latency_dist_mode) { 7081 if (cliConnect(0) == REDIS_ERR) exit(1); 7082 latencyDistMode(); 7083 } 7084 7085 /* Slave mode */ 7086 if (config.slave_mode) { 7087 if (cliConnect(0) == REDIS_ERR) exit(1); 7088 slaveMode(); 7089 } 7090 7091 /* Get RDB mode. */ 7092 if (config.getrdb_mode) { 7093 if (cliConnect(0) == REDIS_ERR) exit(1); 7094 getRDB(); 7095 } 7096 7097 /* Pipe mode */ 7098 if (config.pipe_mode) { 7099 if (cliConnect(0) == REDIS_ERR) exit(1); 7100 pipeMode(); 7101 } 7102 7103 /* Find big keys */ 7104 if (config.bigkeys) { 7105 if (cliConnect(0) == REDIS_ERR) exit(1); 7106 findBigKeys(0, 0); 7107 } 7108 7109 /* Find large keys */ 7110 if (config.memkeys) { 7111 if (cliConnect(0) == REDIS_ERR) exit(1); 7112 findBigKeys(1, config.memkeys_samples); 7113 } 7114 7115 /* Find hot keys */ 7116 if (config.hotkeys) { 7117 if (cliConnect(0) == REDIS_ERR) exit(1); 7118 findHotKeys(); 7119 } 7120 7121 /* Stat mode */ 7122 if (config.stat_mode) { 7123 if (cliConnect(0) == REDIS_ERR) exit(1); 7124 if (config.interval == 0) config.interval = 1000000; 7125 statMode(); 7126 } 7127 7128 /* Scan mode */ 7129 if (config.scan_mode) { 7130 if (cliConnect(0) == REDIS_ERR) exit(1); 7131 scanMode(); 7132 } 7133 7134 /* LRU test mode */ 7135 if (config.lru_test_mode) { 7136 if (cliConnect(0) == REDIS_ERR) exit(1); 7137 LRUTestMode(); 7138 } 7139 7140 /* Intrinsic latency mode */ 7141 if (config.intrinsic_latency_mode) intrinsicLatencyMode(); 7142 7143 /* Start interactive mode when no command is provided */ 7144 if (argc == 0 && !config.eval) { 7145 /* Ignore SIGPIPE in interactive mode to force a reconnect */ 7146 signal(SIGPIPE, SIG_IGN); 7147 7148 /* Note that in repl mode we don't abort on connection error. 7149 * A new attempt will be performed for every command send. */ 7150 cliConnect(0); 7151 repl(); 7152 } 7153 7154 /* Otherwise, we have some arguments to execute */ 7155 if (cliConnect(0) != REDIS_OK) exit(1); 7156 if (config.eval) { 7157 return evalMode(argc,argv); 7158 } else { 7159 return noninteractive(argc,convertToSds(argc,argv)); 7160 } 7161 } 7162