1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include "atomicvar.h" 33 34 #include <signal.h> 35 #include <ctype.h> 36 37 /*----------------------------------------------------------------------------- 38 * C-level DB API 39 *----------------------------------------------------------------------------*/ 40 41 int keyIsExpired(redisDb *db, robj *key); 42 43 /* Update LFU when an object is accessed. 44 * Firstly, decrement the counter if the decrement time is reached. 45 * Then logarithmically increment the counter, and update the access time. */ 46 void updateLFU(robj *val) { 47 unsigned long counter = LFUDecrAndReturn(val); 48 counter = LFULogIncr(counter); 49 val->lru = (LFUGetTimeInMinutes()<<8) | counter; 50 } 51 52 /* Low level key lookup API, not actually called directly from commands 53 * implementations that should instead rely on lookupKeyRead(), 54 * lookupKeyWrite() and lookupKeyReadWithFlags(). */ 55 robj *lookupKey(redisDb *db, robj *key, int flags) { 56 dictEntry *de = dictFind(db->dict,key->ptr); 57 if (de) { 58 robj *val = dictGetVal(de); 59 60 /* Update the access time for the ageing algorithm. 61 * Don't do it if we have a saving child, as this will trigger 62 * a copy on write madness. */ 63 if (server.rdb_child_pid == -1 && 64 server.aof_child_pid == -1 && 65 !(flags & LOOKUP_NOTOUCH)) 66 { 67 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { 68 updateLFU(val); 69 } else { 70 val->lru = LRU_CLOCK(); 71 } 72 } 73 return val; 74 } else { 75 return NULL; 76 } 77 } 78 79 /* Lookup a key for read operations, or return NULL if the key is not found 80 * in the specified DB. 81 * 82 * As a side effect of calling this function: 83 * 1. A key gets expired if it reached it's TTL. 84 * 2. The key last access time is updated. 85 * 3. The global keys hits/misses stats are updated (reported in INFO). 86 * 87 * This API should not be used when we write to the key after obtaining 88 * the object linked to the key, but only for read only operations. 89 * 90 * Flags change the behavior of this command: 91 * 92 * LOOKUP_NONE (or zero): no special flags are passed. 93 * LOOKUP_NOTOUCH: don't alter the last access time of the key. 94 * 95 * Note: this function also returns NULL if the key is logically expired 96 * but still existing, in case this is a slave, since this API is called only 97 * for read operations. Even if the key expiry is master-driven, we can 98 * correctly report a key is expired on slaves even if the master is lagging 99 * expiring our key via DELs in the replication link. */ 100 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { 101 robj *val; 102 103 if (expireIfNeeded(db,key) == 1) { 104 /* Key expired. If we are in the context of a master, expireIfNeeded() 105 * returns 0 only when the key does not exist at all, so it's safe 106 * to return NULL ASAP. */ 107 if (server.masterhost == NULL) { 108 server.stat_keyspace_misses++; 109 return NULL; 110 } 111 112 /* However if we are in the context of a slave, expireIfNeeded() will 113 * not really try to expire the key, it only returns information 114 * about the "logical" status of the key: key expiring is up to the 115 * master in order to have a consistent view of master's data set. 116 * 117 * However, if the command caller is not the master, and as additional 118 * safety measure, the command invoked is a read-only command, we can 119 * safely return NULL here, and provide a more consistent behavior 120 * to clients accessign expired values in a read-only fashion, that 121 * will say the key as non existing. 122 * 123 * Notably this covers GETs when slaves are used to scale reads. */ 124 if (server.current_client && 125 server.current_client != server.master && 126 server.current_client->cmd && 127 server.current_client->cmd->flags & CMD_READONLY) 128 { 129 server.stat_keyspace_misses++; 130 return NULL; 131 } 132 } 133 val = lookupKey(db,key,flags); 134 if (val == NULL) 135 server.stat_keyspace_misses++; 136 else 137 server.stat_keyspace_hits++; 138 return val; 139 } 140 141 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the 142 * common case. */ 143 robj *lookupKeyRead(redisDb *db, robj *key) { 144 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); 145 } 146 147 /* Lookup a key for write operations, and as a side effect, if needed, expires 148 * the key if its TTL is reached. 149 * 150 * Returns the linked value object if the key exists or NULL if the key 151 * does not exist in the specified DB. */ 152 robj *lookupKeyWrite(redisDb *db, robj *key) { 153 expireIfNeeded(db,key); 154 return lookupKey(db,key,LOOKUP_NONE); 155 } 156 157 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { 158 robj *o = lookupKeyRead(c->db, key); 159 if (!o) addReply(c,reply); 160 return o; 161 } 162 163 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { 164 robj *o = lookupKeyWrite(c->db, key); 165 if (!o) addReply(c,reply); 166 return o; 167 } 168 169 /* Add the key to the DB. It's up to the caller to increment the reference 170 * counter of the value if needed. 171 * 172 * The program is aborted if the key already exists. */ 173 void dbAdd(redisDb *db, robj *key, robj *val) { 174 sds copy = sdsdup(key->ptr); 175 int retval = dictAdd(db->dict, copy, val); 176 177 serverAssertWithInfo(NULL,key,retval == DICT_OK); 178 if (val->type == OBJ_LIST || 179 val->type == OBJ_ZSET) 180 signalKeyAsReady(db, key); 181 if (server.cluster_enabled) slotToKeyAdd(key); 182 } 183 184 /* Overwrite an existing key with a new value. Incrementing the reference 185 * count of the new value is up to the caller. 186 * This function does not modify the expire time of the existing key. 187 * 188 * The program is aborted if the key was not already present. */ 189 void dbOverwrite(redisDb *db, robj *key, robj *val) { 190 dictEntry *de = dictFind(db->dict,key->ptr); 191 192 serverAssertWithInfo(NULL,key,de != NULL); 193 dictEntry auxentry = *de; 194 robj *old = dictGetVal(de); 195 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { 196 val->lru = old->lru; 197 } 198 dictSetVal(db->dict, de, val); 199 200 if (server.lazyfree_lazy_server_del) { 201 freeObjAsync(old); 202 dictSetVal(db->dict, &auxentry, NULL); 203 } 204 205 dictFreeVal(db->dict, &auxentry); 206 } 207 208 /* High level Set operation. This function can be used in order to set 209 * a key, whatever it was existing or not, to a new object. 210 * 211 * 1) The ref count of the value object is incremented. 212 * 2) clients WATCHing for the destination key notified. 213 * 3) The expire time of the key is reset (the key is made persistent). 214 * 215 * All the new keys in the database should be created via this interface. */ 216 void setKey(redisDb *db, robj *key, robj *val) { 217 if (lookupKeyWrite(db,key) == NULL) { 218 dbAdd(db,key,val); 219 } else { 220 dbOverwrite(db,key,val); 221 } 222 incrRefCount(val); 223 removeExpire(db,key); 224 signalModifiedKey(db,key); 225 } 226 227 int dbExists(redisDb *db, robj *key) { 228 return dictFind(db->dict,key->ptr) != NULL; 229 } 230 231 /* Return a random key, in form of a Redis object. 232 * If there are no keys, NULL is returned. 233 * 234 * The function makes sure to return keys not already expired. */ 235 robj *dbRandomKey(redisDb *db) { 236 dictEntry *de; 237 int maxtries = 100; 238 int allvolatile = dictSize(db->dict) == dictSize(db->expires); 239 240 while(1) { 241 sds key; 242 robj *keyobj; 243 244 de = dictGetRandomKey(db->dict); 245 if (de == NULL) return NULL; 246 247 key = dictGetKey(de); 248 keyobj = createStringObject(key,sdslen(key)); 249 if (dictFind(db->expires,key)) { 250 if (allvolatile && server.masterhost && --maxtries == 0) { 251 /* If the DB is composed only of keys with an expire set, 252 * it could happen that all the keys are already logically 253 * expired in the slave, so the function cannot stop because 254 * expireIfNeeded() is false, nor it can stop because 255 * dictGetRandomKey() returns NULL (there are keys to return). 256 * To prevent the infinite loop we do some tries, but if there 257 * are the conditions for an infinite loop, eventually we 258 * return a key name that may be already expired. */ 259 return keyobj; 260 } 261 if (expireIfNeeded(db,keyobj)) { 262 decrRefCount(keyobj); 263 continue; /* search for another key. This expired. */ 264 } 265 } 266 return keyobj; 267 } 268 } 269 270 /* Delete a key, value, and associated expiration entry if any, from the DB */ 271 int dbSyncDelete(redisDb *db, robj *key) { 272 /* Deleting an entry from the expires dict will not free the sds of 273 * the key, because it is shared with the main dictionary. */ 274 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); 275 if (dictDelete(db->dict,key->ptr) == DICT_OK) { 276 if (server.cluster_enabled) slotToKeyDel(key); 277 return 1; 278 } else { 279 return 0; 280 } 281 } 282 283 /* This is a wrapper whose behavior depends on the Redis lazy free 284 * configuration. Deletes the key synchronously or asynchronously. */ 285 int dbDelete(redisDb *db, robj *key) { 286 return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) : 287 dbSyncDelete(db,key); 288 } 289 290 /* Prepare the string object stored at 'key' to be modified destructively 291 * to implement commands like SETBIT or APPEND. 292 * 293 * An object is usually ready to be modified unless one of the two conditions 294 * are true: 295 * 296 * 1) The object 'o' is shared (refcount > 1), we don't want to affect 297 * other users. 298 * 2) The object encoding is not "RAW". 299 * 300 * If the object is found in one of the above conditions (or both) by the 301 * function, an unshared / not-encoded copy of the string object is stored 302 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is 303 * returned. 304 * 305 * USAGE: 306 * 307 * The object 'o' is what the caller already obtained by looking up 'key' 308 * in 'db', the usage pattern looks like this: 309 * 310 * o = lookupKeyWrite(db,key); 311 * if (checkType(c,o,OBJ_STRING)) return; 312 * o = dbUnshareStringValue(db,key,o); 313 * 314 * At this point the caller is ready to modify the object, for example 315 * using an sdscat() call to append some data, or anything else. 316 */ 317 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { 318 serverAssert(o->type == OBJ_STRING); 319 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { 320 robj *decoded = getDecodedObject(o); 321 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); 322 decrRefCount(decoded); 323 dbOverwrite(db,key,o); 324 } 325 return o; 326 } 327 328 /* Remove all keys from all the databases in a Redis server. 329 * If callback is given the function is called from time to time to 330 * signal that work is in progress. 331 * 332 * The dbnum can be -1 if all the DBs should be flushed, or the specified 333 * DB number if we want to flush only a single Redis database number. 334 * 335 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or 336 * EMPTYDB_ASYNC if we want the memory to be freed in a different thread 337 * and the function to return ASAP. 338 * 339 * On success the fuction returns the number of keys removed from the 340 * database(s). Otherwise -1 is returned in the specific case the 341 * DB number is out of range, and errno is set to EINVAL. */ 342 long long emptyDb(int dbnum, int flags, void(callback)(void*)) { 343 int async = (flags & EMPTYDB_ASYNC); 344 long long removed = 0; 345 346 if (dbnum < -1 || dbnum >= server.dbnum) { 347 errno = EINVAL; 348 return -1; 349 } 350 351 int startdb, enddb; 352 if (dbnum == -1) { 353 startdb = 0; 354 enddb = server.dbnum-1; 355 } else { 356 startdb = enddb = dbnum; 357 } 358 359 for (int j = startdb; j <= enddb; j++) { 360 removed += dictSize(server.db[j].dict); 361 if (async) { 362 emptyDbAsync(&server.db[j]); 363 } else { 364 dictEmpty(server.db[j].dict,callback); 365 dictEmpty(server.db[j].expires,callback); 366 } 367 } 368 if (server.cluster_enabled) { 369 if (async) { 370 slotToKeyFlushAsync(); 371 } else { 372 slotToKeyFlush(); 373 } 374 } 375 if (dbnum == -1) flushSlaveKeysWithExpireList(); 376 return removed; 377 } 378 379 int selectDb(client *c, int id) { 380 if (id < 0 || id >= server.dbnum) 381 return C_ERR; 382 c->db = &server.db[id]; 383 return C_OK; 384 } 385 386 /*----------------------------------------------------------------------------- 387 * Hooks for key space changes. 388 * 389 * Every time a key in the database is modified the function 390 * signalModifiedKey() is called. 391 * 392 * Every time a DB is flushed the function signalFlushDb() is called. 393 *----------------------------------------------------------------------------*/ 394 395 void signalModifiedKey(redisDb *db, robj *key) { 396 touchWatchedKey(db,key); 397 } 398 399 void signalFlushedDb(int dbid) { 400 touchWatchedKeysOnFlush(dbid); 401 } 402 403 /*----------------------------------------------------------------------------- 404 * Type agnostic commands operating on the key space 405 *----------------------------------------------------------------------------*/ 406 407 /* Return the set of flags to use for the emptyDb() call for FLUSHALL 408 * and FLUSHDB commands. 409 * 410 * Currently the command just attempts to parse the "ASYNC" option. It 411 * also checks if the command arity is wrong. 412 * 413 * On success C_OK is returned and the flags are stored in *flags, otherwise 414 * C_ERR is returned and the function sends an error to the client. */ 415 int getFlushCommandFlags(client *c, int *flags) { 416 /* Parse the optional ASYNC option. */ 417 if (c->argc > 1) { 418 if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) { 419 addReply(c,shared.syntaxerr); 420 return C_ERR; 421 } 422 *flags = EMPTYDB_ASYNC; 423 } else { 424 *flags = EMPTYDB_NO_FLAGS; 425 } 426 return C_OK; 427 } 428 429 /* FLUSHDB [ASYNC] 430 * 431 * Flushes the currently SELECTed Redis DB. */ 432 void flushdbCommand(client *c) { 433 int flags; 434 435 if (getFlushCommandFlags(c,&flags) == C_ERR) return; 436 signalFlushedDb(c->db->id); 437 server.dirty += emptyDb(c->db->id,flags,NULL); 438 addReply(c,shared.ok); 439 } 440 441 /* FLUSHALL [ASYNC] 442 * 443 * Flushes the whole server data set. */ 444 void flushallCommand(client *c) { 445 int flags; 446 447 if (getFlushCommandFlags(c,&flags) == C_ERR) return; 448 signalFlushedDb(-1); 449 server.dirty += emptyDb(-1,flags,NULL); 450 addReply(c,shared.ok); 451 if (server.rdb_child_pid != -1) { 452 kill(server.rdb_child_pid,SIGUSR1); 453 rdbRemoveTempFile(server.rdb_child_pid); 454 } 455 if (server.saveparamslen > 0) { 456 /* Normally rdbSave() will reset dirty, but we don't want this here 457 * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ 458 int saved_dirty = server.dirty; 459 rdbSaveInfo rsi, *rsiptr; 460 rsiptr = rdbPopulateSaveInfo(&rsi); 461 rdbSave(server.rdb_filename,rsiptr); 462 server.dirty = saved_dirty; 463 } 464 server.dirty++; 465 } 466 467 /* This command implements DEL and LAZYDEL. */ 468 void delGenericCommand(client *c, int lazy) { 469 int numdel = 0, j; 470 471 for (j = 1; j < c->argc; j++) { 472 expireIfNeeded(c->db,c->argv[j]); 473 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : 474 dbSyncDelete(c->db,c->argv[j]); 475 if (deleted) { 476 signalModifiedKey(c->db,c->argv[j]); 477 notifyKeyspaceEvent(NOTIFY_GENERIC, 478 "del",c->argv[j],c->db->id); 479 server.dirty++; 480 numdel++; 481 } 482 } 483 addReplyLongLong(c,numdel); 484 } 485 486 void delCommand(client *c) { 487 delGenericCommand(c,0); 488 } 489 490 void unlinkCommand(client *c) { 491 delGenericCommand(c,1); 492 } 493 494 /* EXISTS key1 key2 ... key_N. 495 * Return value is the number of keys existing. */ 496 void existsCommand(client *c) { 497 long long count = 0; 498 int j; 499 500 for (j = 1; j < c->argc; j++) { 501 if (lookupKeyRead(c->db,c->argv[j])) count++; 502 } 503 addReplyLongLong(c,count); 504 } 505 506 void selectCommand(client *c) { 507 long id; 508 509 if (getLongFromObjectOrReply(c, c->argv[1], &id, 510 "invalid DB index") != C_OK) 511 return; 512 513 if (server.cluster_enabled && id != 0) { 514 addReplyError(c,"SELECT is not allowed in cluster mode"); 515 return; 516 } 517 if (selectDb(c,id) == C_ERR) { 518 addReplyError(c,"DB index is out of range"); 519 } else { 520 addReply(c,shared.ok); 521 } 522 } 523 524 void randomkeyCommand(client *c) { 525 robj *key; 526 527 if ((key = dbRandomKey(c->db)) == NULL) { 528 addReply(c,shared.nullbulk); 529 return; 530 } 531 532 addReplyBulk(c,key); 533 decrRefCount(key); 534 } 535 536 void keysCommand(client *c) { 537 dictIterator *di; 538 dictEntry *de; 539 sds pattern = c->argv[1]->ptr; 540 int plen = sdslen(pattern), allkeys; 541 unsigned long numkeys = 0; 542 void *replylen = addDeferredMultiBulkLength(c); 543 544 di = dictGetSafeIterator(c->db->dict); 545 allkeys = (pattern[0] == '*' && pattern[1] == '\0'); 546 while((de = dictNext(di)) != NULL) { 547 sds key = dictGetKey(de); 548 robj *keyobj; 549 550 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { 551 keyobj = createStringObject(key,sdslen(key)); 552 if (!keyIsExpired(c->db,keyobj)) { 553 addReplyBulk(c,keyobj); 554 numkeys++; 555 } 556 decrRefCount(keyobj); 557 } 558 } 559 dictReleaseIterator(di); 560 setDeferredMultiBulkLength(c,replylen,numkeys); 561 } 562 563 /* This callback is used by scanGenericCommand in order to collect elements 564 * returned by the dictionary iterator into a list. */ 565 void scanCallback(void *privdata, const dictEntry *de) { 566 void **pd = (void**) privdata; 567 list *keys = pd[0]; 568 robj *o = pd[1]; 569 robj *key, *val = NULL; 570 571 if (o == NULL) { 572 sds sdskey = dictGetKey(de); 573 key = createStringObject(sdskey, sdslen(sdskey)); 574 } else if (o->type == OBJ_SET) { 575 sds keysds = dictGetKey(de); 576 key = createStringObject(keysds,sdslen(keysds)); 577 } else if (o->type == OBJ_HASH) { 578 sds sdskey = dictGetKey(de); 579 sds sdsval = dictGetVal(de); 580 key = createStringObject(sdskey,sdslen(sdskey)); 581 val = createStringObject(sdsval,sdslen(sdsval)); 582 } else if (o->type == OBJ_ZSET) { 583 sds sdskey = dictGetKey(de); 584 key = createStringObject(sdskey,sdslen(sdskey)); 585 val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0); 586 } else { 587 serverPanic("Type not handled in SCAN callback."); 588 } 589 590 listAddNodeTail(keys, key); 591 if (val) listAddNodeTail(keys, val); 592 } 593 594 /* Try to parse a SCAN cursor stored at object 'o': 595 * if the cursor is valid, store it as unsigned integer into *cursor and 596 * returns C_OK. Otherwise return C_ERR and send an error to the 597 * client. */ 598 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { 599 char *eptr; 600 601 /* Use strtoul() because we need an *unsigned* long, so 602 * getLongLongFromObject() does not cover the whole cursor space. */ 603 errno = 0; 604 *cursor = strtoul(o->ptr, &eptr, 10); 605 if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE) 606 { 607 addReplyError(c, "invalid cursor"); 608 return C_ERR; 609 } 610 return C_OK; 611 } 612 613 /* This command implements SCAN, HSCAN and SSCAN commands. 614 * If object 'o' is passed, then it must be a Hash or Set object, otherwise 615 * if 'o' is NULL the command will operate on the dictionary associated with 616 * the current database. 617 * 618 * When 'o' is not NULL the function assumes that the first argument in 619 * the client arguments vector is a key so it skips it before iterating 620 * in order to parse options. 621 * 622 * In the case of a Hash object the function returns both the field and value 623 * of every element on the Hash. */ 624 void scanGenericCommand(client *c, robj *o, unsigned long cursor) { 625 int i, j; 626 list *keys = listCreate(); 627 listNode *node, *nextnode; 628 long count = 10; 629 sds pat = NULL; 630 int patlen = 0, use_pattern = 0; 631 dict *ht; 632 633 /* Object must be NULL (to iterate keys names), or the type of the object 634 * must be Set, Sorted Set, or Hash. */ 635 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH || 636 o->type == OBJ_ZSET); 637 638 /* Set i to the first option argument. The previous one is the cursor. */ 639 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */ 640 641 /* Step 1: Parse options. */ 642 while (i < c->argc) { 643 j = c->argc - i; 644 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) { 645 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL) 646 != C_OK) 647 { 648 goto cleanup; 649 } 650 651 if (count < 1) { 652 addReply(c,shared.syntaxerr); 653 goto cleanup; 654 } 655 656 i += 2; 657 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) { 658 pat = c->argv[i+1]->ptr; 659 patlen = sdslen(pat); 660 661 /* The pattern always matches if it is exactly "*", so it is 662 * equivalent to disabling it. */ 663 use_pattern = !(pat[0] == '*' && patlen == 1); 664 665 i += 2; 666 } else { 667 addReply(c,shared.syntaxerr); 668 goto cleanup; 669 } 670 } 671 672 /* Step 2: Iterate the collection. 673 * 674 * Note that if the object is encoded with a ziplist, intset, or any other 675 * representation that is not a hash table, we are sure that it is also 676 * composed of a small number of elements. So to avoid taking state we 677 * just return everything inside the object in a single call, setting the 678 * cursor to zero to signal the end of the iteration. */ 679 680 /* Handle the case of a hash table. */ 681 ht = NULL; 682 if (o == NULL) { 683 ht = c->db->dict; 684 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { 685 ht = o->ptr; 686 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { 687 ht = o->ptr; 688 count *= 2; /* We return key / value for this type. */ 689 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { 690 zset *zs = o->ptr; 691 ht = zs->dict; 692 count *= 2; /* We return key / value for this type. */ 693 } 694 695 if (ht) { 696 void *privdata[2]; 697 /* We set the max number of iterations to ten times the specified 698 * COUNT, so if the hash table is in a pathological state (very 699 * sparsely populated) we avoid to block too much time at the cost 700 * of returning no or very few elements. */ 701 long maxiterations = count*10; 702 703 /* We pass two pointers to the callback: the list to which it will 704 * add new elements, and the object containing the dictionary so that 705 * it is possible to fetch more data in a type-dependent way. */ 706 privdata[0] = keys; 707 privdata[1] = o; 708 do { 709 cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); 710 } while (cursor && 711 maxiterations-- && 712 listLength(keys) < (unsigned long)count); 713 } else if (o->type == OBJ_SET) { 714 int pos = 0; 715 int64_t ll; 716 717 while(intsetGet(o->ptr,pos++,&ll)) 718 listAddNodeTail(keys,createStringObjectFromLongLong(ll)); 719 cursor = 0; 720 } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { 721 unsigned char *p = ziplistIndex(o->ptr,0); 722 unsigned char *vstr; 723 unsigned int vlen; 724 long long vll; 725 726 while(p) { 727 ziplistGet(p,&vstr,&vlen,&vll); 728 listAddNodeTail(keys, 729 (vstr != NULL) ? createStringObject((char*)vstr,vlen) : 730 createStringObjectFromLongLong(vll)); 731 p = ziplistNext(o->ptr,p); 732 } 733 cursor = 0; 734 } else { 735 serverPanic("Not handled encoding in SCAN."); 736 } 737 738 /* Step 3: Filter elements. */ 739 node = listFirst(keys); 740 while (node) { 741 robj *kobj = listNodeValue(node); 742 nextnode = listNextNode(node); 743 int filter = 0; 744 745 /* Filter element if it does not match the pattern. */ 746 if (!filter && use_pattern) { 747 if (sdsEncodedObject(kobj)) { 748 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0)) 749 filter = 1; 750 } else { 751 char buf[LONG_STR_SIZE]; 752 int len; 753 754 serverAssert(kobj->encoding == OBJ_ENCODING_INT); 755 len = ll2string(buf,sizeof(buf),(long)kobj->ptr); 756 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1; 757 } 758 } 759 760 /* Filter element if it is an expired key. */ 761 if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1; 762 763 /* Remove the element and its associted value if needed. */ 764 if (filter) { 765 decrRefCount(kobj); 766 listDelNode(keys, node); 767 } 768 769 /* If this is a hash or a sorted set, we have a flat list of 770 * key-value elements, so if this element was filtered, remove the 771 * value, or skip it if it was not filtered: we only match keys. */ 772 if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) { 773 node = nextnode; 774 nextnode = listNextNode(node); 775 if (filter) { 776 kobj = listNodeValue(node); 777 decrRefCount(kobj); 778 listDelNode(keys, node); 779 } 780 } 781 node = nextnode; 782 } 783 784 /* Step 4: Reply to the client. */ 785 addReplyMultiBulkLen(c, 2); 786 addReplyBulkLongLong(c,cursor); 787 788 addReplyMultiBulkLen(c, listLength(keys)); 789 while ((node = listFirst(keys)) != NULL) { 790 robj *kobj = listNodeValue(node); 791 addReplyBulk(c, kobj); 792 decrRefCount(kobj); 793 listDelNode(keys, node); 794 } 795 796 cleanup: 797 listSetFreeMethod(keys,decrRefCountVoid); 798 listRelease(keys); 799 } 800 801 /* The SCAN command completely relies on scanGenericCommand. */ 802 void scanCommand(client *c) { 803 unsigned long cursor; 804 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; 805 scanGenericCommand(c,NULL,cursor); 806 } 807 808 void dbsizeCommand(client *c) { 809 addReplyLongLong(c,dictSize(c->db->dict)); 810 } 811 812 void lastsaveCommand(client *c) { 813 addReplyLongLong(c,server.lastsave); 814 } 815 816 void typeCommand(client *c) { 817 robj *o; 818 char *type; 819 820 o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); 821 if (o == NULL) { 822 type = "none"; 823 } else { 824 switch(o->type) { 825 case OBJ_STRING: type = "string"; break; 826 case OBJ_LIST: type = "list"; break; 827 case OBJ_SET: type = "set"; break; 828 case OBJ_ZSET: type = "zset"; break; 829 case OBJ_HASH: type = "hash"; break; 830 case OBJ_STREAM: type = "stream"; break; 831 case OBJ_MODULE: { 832 moduleValue *mv = o->ptr; 833 type = mv->type->name; 834 }; break; 835 default: type = "unknown"; break; 836 } 837 } 838 addReplyStatus(c,type); 839 } 840 841 void shutdownCommand(client *c) { 842 int flags = 0; 843 844 if (c->argc > 2) { 845 addReply(c,shared.syntaxerr); 846 return; 847 } else if (c->argc == 2) { 848 if (!strcasecmp(c->argv[1]->ptr,"nosave")) { 849 flags |= SHUTDOWN_NOSAVE; 850 } else if (!strcasecmp(c->argv[1]->ptr,"save")) { 851 flags |= SHUTDOWN_SAVE; 852 } else { 853 addReply(c,shared.syntaxerr); 854 return; 855 } 856 } 857 /* When SHUTDOWN is called while the server is loading a dataset in 858 * memory we need to make sure no attempt is performed to save 859 * the dataset on shutdown (otherwise it could overwrite the current DB 860 * with half-read data). 861 * 862 * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */ 863 if (server.loading || server.sentinel_mode) 864 flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE; 865 if (prepareForShutdown(flags) == C_OK) exit(0); 866 addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); 867 } 868 869 void renameGenericCommand(client *c, int nx) { 870 robj *o; 871 long long expire; 872 int samekey = 0; 873 874 /* When source and dest key is the same, no operation is performed, 875 * if the key exists, however we still return an error on unexisting key. */ 876 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; 877 878 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) 879 return; 880 881 if (samekey) { 882 addReply(c,nx ? shared.czero : shared.ok); 883 return; 884 } 885 886 incrRefCount(o); 887 expire = getExpire(c->db,c->argv[1]); 888 if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { 889 if (nx) { 890 decrRefCount(o); 891 addReply(c,shared.czero); 892 return; 893 } 894 /* Overwrite: delete the old key before creating the new one 895 * with the same name. */ 896 dbDelete(c->db,c->argv[2]); 897 } 898 dbAdd(c->db,c->argv[2],o); 899 if (expire != -1) setExpire(c,c->db,c->argv[2],expire); 900 dbDelete(c->db,c->argv[1]); 901 signalModifiedKey(c->db,c->argv[1]); 902 signalModifiedKey(c->db,c->argv[2]); 903 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", 904 c->argv[1],c->db->id); 905 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", 906 c->argv[2],c->db->id); 907 server.dirty++; 908 addReply(c,nx ? shared.cone : shared.ok); 909 } 910 911 void renameCommand(client *c) { 912 renameGenericCommand(c,0); 913 } 914 915 void renamenxCommand(client *c) { 916 renameGenericCommand(c,1); 917 } 918 919 void moveCommand(client *c) { 920 robj *o; 921 redisDb *src, *dst; 922 int srcid; 923 long long dbid, expire; 924 925 if (server.cluster_enabled) { 926 addReplyError(c,"MOVE is not allowed in cluster mode"); 927 return; 928 } 929 930 /* Obtain source and target DB pointers */ 931 src = c->db; 932 srcid = c->db->id; 933 934 if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR || 935 dbid < INT_MIN || dbid > INT_MAX || 936 selectDb(c,dbid) == C_ERR) 937 { 938 addReply(c,shared.outofrangeerr); 939 return; 940 } 941 dst = c->db; 942 selectDb(c,srcid); /* Back to the source DB */ 943 944 /* If the user is moving using as target the same 945 * DB as the source DB it is probably an error. */ 946 if (src == dst) { 947 addReply(c,shared.sameobjecterr); 948 return; 949 } 950 951 /* Check if the element exists and get a reference */ 952 o = lookupKeyWrite(c->db,c->argv[1]); 953 if (!o) { 954 addReply(c,shared.czero); 955 return; 956 } 957 expire = getExpire(c->db,c->argv[1]); 958 959 /* Return zero if the key already exists in the target DB */ 960 if (lookupKeyWrite(dst,c->argv[1]) != NULL) { 961 addReply(c,shared.czero); 962 return; 963 } 964 dbAdd(dst,c->argv[1],o); 965 if (expire != -1) setExpire(c,dst,c->argv[1],expire); 966 incrRefCount(o); 967 968 /* OK! key moved, free the entry in the source DB */ 969 dbDelete(src,c->argv[1]); 970 server.dirty++; 971 addReply(c,shared.cone); 972 } 973 974 /* Helper function for dbSwapDatabases(): scans the list of keys that have 975 * one or more blocked clients for B[LR]POP or other blocking commands 976 * and signal the keys as ready if they are of the right type. See the comment 977 * where the function is used for more info. */ 978 void scanDatabaseForReadyLists(redisDb *db) { 979 dictEntry *de; 980 dictIterator *di = dictGetSafeIterator(db->blocking_keys); 981 while((de = dictNext(di)) != NULL) { 982 robj *key = dictGetKey(de); 983 robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); 984 if (value && (value->type == OBJ_LIST || 985 value->type == OBJ_STREAM || 986 value->type == OBJ_ZSET)) 987 signalKeyAsReady(db, key); 988 } 989 dictReleaseIterator(di); 990 } 991 992 /* Swap two databases at runtime so that all clients will magically see 993 * the new database even if already connected. Note that the client 994 * structure c->db points to a given DB, so we need to be smarter and 995 * swap the underlying referenced structures, otherwise we would need 996 * to fix all the references to the Redis DB structure. 997 * 998 * Returns C_ERR if at least one of the DB ids are out of range, otherwise 999 * C_OK is returned. */ 1000 int dbSwapDatabases(int id1, int id2) { 1001 if (id1 < 0 || id1 >= server.dbnum || 1002 id2 < 0 || id2 >= server.dbnum) return C_ERR; 1003 if (id1 == id2) return C_OK; 1004 redisDb aux = server.db[id1]; 1005 redisDb *db1 = &server.db[id1], *db2 = &server.db[id2]; 1006 1007 /* Swap hash tables. Note that we don't swap blocking_keys, 1008 * ready_keys and watched_keys, since we want clients to 1009 * remain in the same DB they were. */ 1010 db1->dict = db2->dict; 1011 db1->expires = db2->expires; 1012 db1->avg_ttl = db2->avg_ttl; 1013 1014 db2->dict = aux.dict; 1015 db2->expires = aux.expires; 1016 db2->avg_ttl = aux.avg_ttl; 1017 1018 /* Now we need to handle clients blocked on lists: as an effect 1019 * of swapping the two DBs, a client that was waiting for list 1020 * X in a given DB, may now actually be unblocked if X happens 1021 * to exist in the new version of the DB, after the swap. 1022 * 1023 * However normally we only do this check for efficiency reasons 1024 * in dbAdd() when a list is created. So here we need to rescan 1025 * the list of clients blocked on lists and signal lists as ready 1026 * if needed. */ 1027 scanDatabaseForReadyLists(db1); 1028 scanDatabaseForReadyLists(db2); 1029 return C_OK; 1030 } 1031 1032 /* SWAPDB db1 db2 */ 1033 void swapdbCommand(client *c) { 1034 long id1, id2; 1035 1036 /* Not allowed in cluster mode: we have just DB 0 there. */ 1037 if (server.cluster_enabled) { 1038 addReplyError(c,"SWAPDB is not allowed in cluster mode"); 1039 return; 1040 } 1041 1042 /* Get the two DBs indexes. */ 1043 if (getLongFromObjectOrReply(c, c->argv[1], &id1, 1044 "invalid first DB index") != C_OK) 1045 return; 1046 1047 if (getLongFromObjectOrReply(c, c->argv[2], &id2, 1048 "invalid second DB index") != C_OK) 1049 return; 1050 1051 /* Swap... */ 1052 if (dbSwapDatabases(id1,id2) == C_ERR) { 1053 addReplyError(c,"DB index is out of range"); 1054 return; 1055 } else { 1056 server.dirty++; 1057 addReply(c,shared.ok); 1058 } 1059 } 1060 1061 /*----------------------------------------------------------------------------- 1062 * Expires API 1063 *----------------------------------------------------------------------------*/ 1064 1065 int removeExpire(redisDb *db, robj *key) { 1066 /* An expire may only be removed if there is a corresponding entry in the 1067 * main dict. Otherwise, the key will never be freed. */ 1068 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); 1069 return dictDelete(db->expires,key->ptr) == DICT_OK; 1070 } 1071 1072 /* Set an expire to the specified key. If the expire is set in the context 1073 * of an user calling a command 'c' is the client, otherwise 'c' is set 1074 * to NULL. The 'when' parameter is the absolute unix time in milliseconds 1075 * after which the key will no longer be considered valid. */ 1076 void setExpire(client *c, redisDb *db, robj *key, long long when) { 1077 dictEntry *kde, *de; 1078 1079 /* Reuse the sds from the main dict in the expire dict */ 1080 kde = dictFind(db->dict,key->ptr); 1081 serverAssertWithInfo(NULL,key,kde != NULL); 1082 de = dictAddOrFind(db->expires,dictGetKey(kde)); 1083 dictSetSignedIntegerVal(de,when); 1084 1085 int writable_slave = server.masterhost && server.repl_slave_ro == 0; 1086 if (c && writable_slave && !(c->flags & CLIENT_MASTER)) 1087 rememberSlaveKeyWithExpire(db,key); 1088 } 1089 1090 /* Return the expire time of the specified key, or -1 if no expire 1091 * is associated with this key (i.e. the key is non volatile) */ 1092 long long getExpire(redisDb *db, robj *key) { 1093 dictEntry *de; 1094 1095 /* No expire? return ASAP */ 1096 if (dictSize(db->expires) == 0 || 1097 (de = dictFind(db->expires,key->ptr)) == NULL) return -1; 1098 1099 /* The entry was found in the expire dict, this means it should also 1100 * be present in the main dict (safety check). */ 1101 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); 1102 return dictGetSignedIntegerVal(de); 1103 } 1104 1105 /* Propagate expires into slaves and the AOF file. 1106 * When a key expires in the master, a DEL operation for this key is sent 1107 * to all the slaves and the AOF file if enabled. 1108 * 1109 * This way the key expiry is centralized in one place, and since both 1110 * AOF and the master->slave link guarantee operation ordering, everything 1111 * will be consistent even if we allow write operations against expiring 1112 * keys. */ 1113 void propagateExpire(redisDb *db, robj *key, int lazy) { 1114 robj *argv[2]; 1115 1116 argv[0] = lazy ? shared.unlink : shared.del; 1117 argv[1] = key; 1118 incrRefCount(argv[0]); 1119 incrRefCount(argv[1]); 1120 1121 if (server.aof_state != AOF_OFF) 1122 feedAppendOnlyFile(server.delCommand,db->id,argv,2); 1123 replicationFeedSlaves(server.slaves,db->id,argv,2); 1124 1125 decrRefCount(argv[0]); 1126 decrRefCount(argv[1]); 1127 } 1128 1129 /* Check if the key is expired. */ 1130 int keyIsExpired(redisDb *db, robj *key) { 1131 mstime_t when = getExpire(db,key); 1132 1133 if (when < 0) return 0; /* No expire for this key */ 1134 1135 /* Don't expire anything while loading. It will be done later. */ 1136 if (server.loading) return 0; 1137 1138 /* If we are in the context of a Lua script, we pretend that time is 1139 * blocked to when the Lua script started. This way a key can expire 1140 * only the first time it is accessed and not in the middle of the 1141 * script execution, making propagation to slaves / AOF consistent. 1142 * See issue #1525 on Github for more information. */ 1143 mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); 1144 1145 return now > when; 1146 } 1147 1148 /* This function is called when we are going to perform some operation 1149 * in a given key, but such key may be already logically expired even if 1150 * it still exists in the database. The main way this function is called 1151 * is via lookupKey*() family of functions. 1152 * 1153 * The behavior of the function depends on the replication role of the 1154 * instance, because slave instances do not expire keys, they wait 1155 * for DELs from the master for consistency matters. However even 1156 * slaves will try to have a coherent return value for the function, 1157 * so that read commands executed in the slave side will be able to 1158 * behave like if the key is expired even if still present (because the 1159 * master has yet to propagate the DEL). 1160 * 1161 * In masters as a side effect of finding a key which is expired, such 1162 * key will be evicted from the database. Also this may trigger the 1163 * propagation of a DEL/UNLINK command in AOF / replication stream. 1164 * 1165 * The return value of the function is 0 if the key is still valid, 1166 * otherwise the function returns 1 if the key is expired. */ 1167 int expireIfNeeded(redisDb *db, robj *key) { 1168 if (!keyIsExpired(db,key)) return 0; 1169 1170 /* If we are running in the context of a slave, instead of 1171 * evicting the expired key from the database, we return ASAP: 1172 * the slave key expiration is controlled by the master that will 1173 * send us synthesized DEL operations for expired keys. 1174 * 1175 * Still we try to return the right information to the caller, 1176 * that is, 0 if we think the key should be still valid, 1 if 1177 * we think the key is expired at this time. */ 1178 if (server.masterhost != NULL) return 1; 1179 1180 /* Delete the key */ 1181 server.stat_expiredkeys++; 1182 propagateExpire(db,key,server.lazyfree_lazy_expire); 1183 notifyKeyspaceEvent(NOTIFY_EXPIRED, 1184 "expired",key,db->id); 1185 return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : 1186 dbSyncDelete(db,key); 1187 } 1188 1189 /* ----------------------------------------------------------------------------- 1190 * API to get key arguments from commands 1191 * ---------------------------------------------------------------------------*/ 1192 1193 /* The base case is to use the keys position as given in the command table 1194 * (firstkey, lastkey, step). */ 1195 int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) { 1196 int j, i = 0, last, *keys; 1197 UNUSED(argv); 1198 1199 if (cmd->firstkey == 0) { 1200 *numkeys = 0; 1201 return NULL; 1202 } 1203 1204 last = cmd->lastkey; 1205 if (last < 0) last = argc+last; 1206 keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1)); 1207 for (j = cmd->firstkey; j <= last; j += cmd->keystep) { 1208 if (j >= argc) { 1209 /* Modules commands, and standard commands with a not fixed number 1210 * of arguments (negative arity parameter) do not have dispatch 1211 * time arity checks, so we need to handle the case where the user 1212 * passed an invalid number of arguments here. In this case we 1213 * return no keys and expect the command implementation to report 1214 * an arity or syntax error. */ 1215 if (cmd->flags & CMD_MODULE || cmd->arity < 0) { 1216 zfree(keys); 1217 *numkeys = 0; 1218 return NULL; 1219 } else { 1220 serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); 1221 } 1222 } 1223 keys[i++] = j; 1224 } 1225 *numkeys = i; 1226 return keys; 1227 } 1228 1229 /* Return all the arguments that are keys in the command passed via argc / argv. 1230 * 1231 * The command returns the positions of all the key arguments inside the array, 1232 * so the actual return value is an heap allocated array of integers. The 1233 * length of the array is returned by reference into *numkeys. 1234 * 1235 * 'cmd' must be point to the corresponding entry into the redisCommand 1236 * table, according to the command name in argv[0]. 1237 * 1238 * This function uses the command table if a command-specific helper function 1239 * is not required, otherwise it calls the command-specific function. */ 1240 int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1241 if (cmd->flags & CMD_MODULE_GETKEYS) { 1242 return moduleGetCommandKeysViaAPI(cmd,argv,argc,numkeys); 1243 } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) { 1244 return cmd->getkeys_proc(cmd,argv,argc,numkeys); 1245 } else { 1246 return getKeysUsingCommandTable(cmd,argv,argc,numkeys); 1247 } 1248 } 1249 1250 /* Free the result of getKeysFromCommand. */ 1251 void getKeysFreeResult(int *result) { 1252 zfree(result); 1253 } 1254 1255 /* Helper function to extract keys from following commands: 1256 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options> 1257 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */ 1258 int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1259 int i, num, *keys; 1260 UNUSED(cmd); 1261 1262 num = atoi(argv[2]->ptr); 1263 /* Sanity check. Don't return any key if the command is going to 1264 * reply with syntax error. */ 1265 if (num < 1 || num > (argc-3)) { 1266 *numkeys = 0; 1267 return NULL; 1268 } 1269 1270 /* Keys in z{union,inter}store come from two places: 1271 * argv[1] = storage key, 1272 * argv[3...n] = keys to intersect */ 1273 keys = zmalloc(sizeof(int)*(num+1)); 1274 1275 /* Add all key positions for argv[3...n] to keys[] */ 1276 for (i = 0; i < num; i++) keys[i] = 3+i; 1277 1278 /* Finally add the argv[1] key position (the storage key target). */ 1279 keys[num] = 1; 1280 *numkeys = num+1; /* Total keys = {union,inter} keys + storage key */ 1281 return keys; 1282 } 1283 1284 /* Helper function to extract keys from the following commands: 1285 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff] 1286 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */ 1287 int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1288 int i, num, *keys; 1289 UNUSED(cmd); 1290 1291 num = atoi(argv[2]->ptr); 1292 /* Sanity check. Don't return any key if the command is going to 1293 * reply with syntax error. */ 1294 if (num <= 0 || num > (argc-3)) { 1295 *numkeys = 0; 1296 return NULL; 1297 } 1298 1299 keys = zmalloc(sizeof(int)*num); 1300 *numkeys = num; 1301 1302 /* Add all key positions for argv[3...n] to keys[] */ 1303 for (i = 0; i < num; i++) keys[i] = 3+i; 1304 1305 return keys; 1306 } 1307 1308 /* Helper function to extract keys from the SORT command. 1309 * 1310 * SORT <sort-key> ... STORE <store-key> ... 1311 * 1312 * The first argument of SORT is always a key, however a list of options 1313 * follow in SQL-alike style. Here we parse just the minimum in order to 1314 * correctly identify keys in the "STORE" option. */ 1315 int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1316 int i, j, num, *keys, found_store = 0; 1317 UNUSED(cmd); 1318 1319 num = 0; 1320 keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */ 1321 1322 keys[num++] = 1; /* <sort-key> is always present. */ 1323 1324 /* Search for STORE option. By default we consider options to don't 1325 * have arguments, so if we find an unknown option name we scan the 1326 * next. However there are options with 1 or 2 arguments, so we 1327 * provide a list here in order to skip the right number of args. */ 1328 struct { 1329 char *name; 1330 int skip; 1331 } skiplist[] = { 1332 {"limit", 2}, 1333 {"get", 1}, 1334 {"by", 1}, 1335 {NULL, 0} /* End of elements. */ 1336 }; 1337 1338 for (i = 2; i < argc; i++) { 1339 for (j = 0; skiplist[j].name != NULL; j++) { 1340 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) { 1341 i += skiplist[j].skip; 1342 break; 1343 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) { 1344 /* Note: we don't increment "num" here and continue the loop 1345 * to be sure to process the *last* "STORE" option if multiple 1346 * ones are provided. This is same behavior as SORT. */ 1347 found_store = 1; 1348 keys[num] = i+1; /* <store-key> */ 1349 break; 1350 } 1351 } 1352 } 1353 *numkeys = num + found_store; 1354 return keys; 1355 } 1356 1357 int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1358 int i, num, first, *keys; 1359 UNUSED(cmd); 1360 1361 /* Assume the obvious form. */ 1362 first = 3; 1363 num = 1; 1364 1365 /* But check for the extended one with the KEYS option. */ 1366 if (argc > 6) { 1367 for (i = 6; i < argc; i++) { 1368 if (!strcasecmp(argv[i]->ptr,"keys") && 1369 sdslen(argv[3]->ptr) == 0) 1370 { 1371 first = i+1; 1372 num = argc-first; 1373 break; 1374 } 1375 } 1376 } 1377 1378 keys = zmalloc(sizeof(int)*num); 1379 for (i = 0; i < num; i++) keys[i] = first+i; 1380 *numkeys = num; 1381 return keys; 1382 } 1383 1384 /* Helper function to extract keys from following commands: 1385 * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC] 1386 * [COUNT count] [STORE key] [STOREDIST key] 1387 * GEORADIUSBYMEMBER key member radius unit ... options ... */ 1388 int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1389 int i, num, *keys; 1390 UNUSED(cmd); 1391 1392 /* Check for the presence of the stored key in the command */ 1393 int stored_key = -1; 1394 for (i = 5; i < argc; i++) { 1395 char *arg = argv[i]->ptr; 1396 /* For the case when user specifies both "store" and "storedist" options, the 1397 * second key specified would override the first key. This behavior is kept 1398 * the same as in georadiusCommand method. 1399 */ 1400 if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) { 1401 stored_key = i+1; 1402 i++; 1403 } 1404 } 1405 num = 1 + (stored_key == -1 ? 0 : 1); 1406 1407 /* Keys in the command come from two places: 1408 * argv[1] = key, 1409 * argv[5...n] = stored key if present 1410 */ 1411 keys = zmalloc(sizeof(int) * num); 1412 1413 /* Add all key positions to keys[] */ 1414 keys[0] = 1; 1415 if(num > 1) { 1416 keys[1] = stored_key; 1417 } 1418 *numkeys = num; 1419 return keys; 1420 } 1421 1422 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] 1423 * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */ 1424 int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { 1425 int i, num = 0, *keys; 1426 UNUSED(cmd); 1427 1428 /* We need to parse the options of the command in order to seek the first 1429 * "STREAMS" string which is actually the option. This is needed because 1430 * "STREAMS" could also be the name of the consumer group and even the 1431 * name of the stream key. */ 1432 int streams_pos = -1; 1433 for (i = 1; i < argc; i++) { 1434 char *arg = argv[i]->ptr; 1435 if (!strcasecmp(arg, "block")) { 1436 i++; /* Skip option argument. */ 1437 } else if (!strcasecmp(arg, "count")) { 1438 i++; /* Skip option argument. */ 1439 } else if (!strcasecmp(arg, "group")) { 1440 i += 2; /* Skip option argument. */ 1441 } else if (!strcasecmp(arg, "noack")) { 1442 /* Nothing to do. */ 1443 } else if (!strcasecmp(arg, "streams")) { 1444 streams_pos = i; 1445 break; 1446 } else { 1447 break; /* Syntax error. */ 1448 } 1449 } 1450 if (streams_pos != -1) num = argc - streams_pos - 1; 1451 1452 /* Syntax error. */ 1453 if (streams_pos == -1 || num == 0 || num % 2 != 0) { 1454 *numkeys = 0; 1455 return NULL; 1456 } 1457 num /= 2; /* We have half the keys as there are arguments because 1458 there are also the IDs, one per key. */ 1459 1460 keys = zmalloc(sizeof(int) * num); 1461 for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i; 1462 *numkeys = num; 1463 return keys; 1464 } 1465 1466 /* Slot to Key API. This is used by Redis Cluster in order to obtain in 1467 * a fast way a key that belongs to a specified hash slot. This is useful 1468 * while rehashing the cluster and in other conditions when we need to 1469 * understand if we have keys for a given hash slot. */ 1470 void slotToKeyUpdateKey(robj *key, int add) { 1471 unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); 1472 unsigned char buf[64]; 1473 unsigned char *indexed = buf; 1474 size_t keylen = sdslen(key->ptr); 1475 1476 server.cluster->slots_keys_count[hashslot] += add ? 1 : -1; 1477 if (keylen+2 > 64) indexed = zmalloc(keylen+2); 1478 indexed[0] = (hashslot >> 8) & 0xff; 1479 indexed[1] = hashslot & 0xff; 1480 memcpy(indexed+2,key->ptr,keylen); 1481 if (add) { 1482 raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL); 1483 } else { 1484 raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL); 1485 } 1486 if (indexed != buf) zfree(indexed); 1487 } 1488 1489 void slotToKeyAdd(robj *key) { 1490 slotToKeyUpdateKey(key,1); 1491 } 1492 1493 void slotToKeyDel(robj *key) { 1494 slotToKeyUpdateKey(key,0); 1495 } 1496 1497 void slotToKeyFlush(void) { 1498 raxFree(server.cluster->slots_to_keys); 1499 server.cluster->slots_to_keys = raxNew(); 1500 memset(server.cluster->slots_keys_count,0, 1501 sizeof(server.cluster->slots_keys_count)); 1502 } 1503 1504 /* Pupulate the specified array of objects with keys in the specified slot. 1505 * New objects are returned to represent keys, it's up to the caller to 1506 * decrement the reference count to release the keys names. */ 1507 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { 1508 raxIterator iter; 1509 int j = 0; 1510 unsigned char indexed[2]; 1511 1512 indexed[0] = (hashslot >> 8) & 0xff; 1513 indexed[1] = hashslot & 0xff; 1514 raxStart(&iter,server.cluster->slots_to_keys); 1515 raxSeek(&iter,">=",indexed,2); 1516 while(count-- && raxNext(&iter)) { 1517 if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break; 1518 keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2); 1519 } 1520 raxStop(&iter); 1521 return j; 1522 } 1523 1524 /* Remove all the keys in the specified hash slot. 1525 * The number of removed items is returned. */ 1526 unsigned int delKeysInSlot(unsigned int hashslot) { 1527 raxIterator iter; 1528 int j = 0; 1529 unsigned char indexed[2]; 1530 1531 indexed[0] = (hashslot >> 8) & 0xff; 1532 indexed[1] = hashslot & 0xff; 1533 raxStart(&iter,server.cluster->slots_to_keys); 1534 while(server.cluster->slots_keys_count[hashslot]) { 1535 raxSeek(&iter,">=",indexed,2); 1536 raxNext(&iter); 1537 1538 robj *key = createStringObject((char*)iter.key+2,iter.key_len-2); 1539 dbDelete(&server.db[0],key); 1540 decrRefCount(key); 1541 j++; 1542 } 1543 raxStop(&iter); 1544 return j; 1545 } 1546 1547 unsigned int countKeysInSlot(unsigned int hashslot) { 1548 return server.cluster->slots_keys_count[hashslot]; 1549 } 1550