1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32
33 #include <signal.h>
34 #include <ctype.h>
35
36 void slotToKeyAdd(robj *key);
37 void slotToKeyDel(robj *key);
38 void slotToKeyFlush(void);
39
40 /*-----------------------------------------------------------------------------
41 * C-level DB API
42 *----------------------------------------------------------------------------*/
43
44 /* Low level key lookup API, not actually called directly from commands
45 * implementations that should instead rely on lookupKeyRead(),
46 * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)47 robj *lookupKey(redisDb *db, robj *key, int flags) {
48 dictEntry *de = dictFind(db->dict,key->ptr);
49 if (de) {
50 robj *val = dictGetVal(de);
51
52 /* Update the access time for the ageing algorithm.
53 * Don't do it if we have a saving child, as this will trigger
54 * a copy on write madness. */
55 if (server.rdb_child_pid == -1 &&
56 server.aof_child_pid == -1 &&
57 !(flags & LOOKUP_NOTOUCH))
58 {
59 val->lru = LRU_CLOCK();
60 }
61 return val;
62 } else {
63 return NULL;
64 }
65 }
66
67 /* Lookup a key for read operations, or return NULL if the key is not found
68 * in the specified DB.
69 *
70 * As a side effect of calling this function:
71 * 1. A key gets expired if it reached it's TTL.
72 * 2. The key last access time is updated.
73 * 3. The global keys hits/misses stats are updated (reported in INFO).
74 *
75 * This API should not be used when we write to the key after obtaining
76 * the object linked to the key, but only for read only operations.
77 *
78 * Flags change the behavior of this command:
79 *
80 * LOOKUP_NONE (or zero): no special flags are passed.
81 * LOOKUP_NOTOUCH: don't alter the last access time of the key.
82 *
83 * Note: this function also returns NULL is the key is logically expired
84 * but still existing, in case this is a slave, since this API is called only
85 * for read operations. Even if the key expiry is master-driven, we can
86 * correctly report a key is expired on slaves even if the master is lagging
87 * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)88 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
89 robj *val;
90
91 if (expireIfNeeded(db,key) == 1) {
92 /* Key expired. If we are in the context of a master, expireIfNeeded()
93 * returns 0 only when the key does not exist at all, so it's save
94 * to return NULL ASAP. */
95 if (server.masterhost == NULL) return NULL;
96
97 /* However if we are in the context of a slave, expireIfNeeded() will
98 * not really try to expire the key, it only returns information
99 * about the "logical" status of the key: key expiring is up to the
100 * master in order to have a consistent view of master's data set.
101 *
102 * However, if the command caller is not the master, and as additional
103 * safety measure, the command invoked is a read-only command, we can
104 * safely return NULL here, and provide a more consistent behavior
105 * to clients accessign expired values in a read-only fashion, that
106 * will say the key as non exisitng.
107 *
108 * Notably this covers GETs when slaves are used to scale reads. */
109 if (server.current_client &&
110 server.current_client != server.master &&
111 server.current_client->cmd &&
112 server.current_client->cmd->flags & CMD_READONLY)
113 {
114 return NULL;
115 }
116 }
117 val = lookupKey(db,key,flags);
118 if (val == NULL)
119 server.stat_keyspace_misses++;
120 else
121 server.stat_keyspace_hits++;
122 return val;
123 }
124
125 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
126 * common case. */
lookupKeyRead(redisDb * db,robj * key)127 robj *lookupKeyRead(redisDb *db, robj *key) {
128 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
129 }
130
131 /* Lookup a key for write operations, and as a side effect, if needed, expires
132 * the key if its TTL is reached.
133 *
134 * Returns the linked value object if the key exists or NULL if the key
135 * does not exist in the specified DB. */
lookupKeyWrite(redisDb * db,robj * key)136 robj *lookupKeyWrite(redisDb *db, robj *key) {
137 expireIfNeeded(db,key);
138 return lookupKey(db,key,LOOKUP_NONE);
139 }
140
lookupKeyReadOrReply(client * c,robj * key,robj * reply)141 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
142 robj *o = lookupKeyRead(c->db, key);
143 if (!o) addReply(c,reply);
144 return o;
145 }
146
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)147 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
148 robj *o = lookupKeyWrite(c->db, key);
149 if (!o) addReply(c,reply);
150 return o;
151 }
152
153 /* Add the key to the DB. It's up to the caller to increment the reference
154 * counter of the value if needed.
155 *
156 * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)157 void dbAdd(redisDb *db, robj *key, robj *val) {
158 sds copy = sdsdup(key->ptr);
159 int retval = dictAdd(db->dict, copy, val);
160
161 serverAssertWithInfo(NULL,key,retval == DICT_OK);
162 if (val->type == OBJ_LIST) signalListAsReady(db, key);
163 if (server.cluster_enabled) slotToKeyAdd(key);
164 }
165
166 /* Overwrite an existing key with a new value. Incrementing the reference
167 * count of the new value is up to the caller.
168 * This function does not modify the expire time of the existing key.
169 *
170 * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)171 void dbOverwrite(redisDb *db, robj *key, robj *val) {
172 dictEntry *de = dictFind(db->dict,key->ptr);
173
174 serverAssertWithInfo(NULL,key,de != NULL);
175 dictReplace(db->dict, key->ptr, val);
176 }
177
178 /* High level Set operation. This function can be used in order to set
179 * a key, whatever it was existing or not, to a new object.
180 *
181 * 1) The ref count of the value object is incremented.
182 * 2) clients WATCHing for the destination key notified.
183 * 3) The expire time of the key is reset (the key is made persistent). */
setKey(redisDb * db,robj * key,robj * val)184 void setKey(redisDb *db, robj *key, robj *val) {
185 if (lookupKeyWrite(db,key) == NULL) {
186 dbAdd(db,key,val);
187 } else {
188 dbOverwrite(db,key,val);
189 }
190 incrRefCount(val);
191 removeExpire(db,key);
192 signalModifiedKey(db,key);
193 }
194
dbExists(redisDb * db,robj * key)195 int dbExists(redisDb *db, robj *key) {
196 return dictFind(db->dict,key->ptr) != NULL;
197 }
198
199 /* Return a random key, in form of a Redis object.
200 * If there are no keys, NULL is returned.
201 *
202 * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)203 robj *dbRandomKey(redisDb *db) {
204 dictEntry *de;
205
206 while(1) {
207 sds key;
208 robj *keyobj;
209
210 de = dictGetRandomKey(db->dict);
211 if (de == NULL) return NULL;
212
213 key = dictGetKey(de);
214 keyobj = createStringObject(key,sdslen(key));
215 if (dictFind(db->expires,key)) {
216 if (expireIfNeeded(db,keyobj)) {
217 decrRefCount(keyobj);
218 continue; /* search for another key. This expired. */
219 }
220 }
221 return keyobj;
222 }
223 }
224
225 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbDelete(redisDb * db,robj * key)226 int dbDelete(redisDb *db, robj *key) {
227 /* Deleting an entry from the expires dict will not free the sds of
228 * the key, because it is shared with the main dictionary. */
229 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
230 if (dictDelete(db->dict,key->ptr) == DICT_OK) {
231 if (server.cluster_enabled) slotToKeyDel(key);
232 return 1;
233 } else {
234 return 0;
235 }
236 }
237
238 /* Prepare the string object stored at 'key' to be modified destructively
239 * to implement commands like SETBIT or APPEND.
240 *
241 * An object is usually ready to be modified unless one of the two conditions
242 * are true:
243 *
244 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
245 * other users.
246 * 2) The object encoding is not "RAW".
247 *
248 * If the object is found in one of the above conditions (or both) by the
249 * function, an unshared / not-encoded copy of the string object is stored
250 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
251 * returned.
252 *
253 * USAGE:
254 *
255 * The object 'o' is what the caller already obtained by looking up 'key'
256 * in 'db', the usage pattern looks like this:
257 *
258 * o = lookupKeyWrite(db,key);
259 * if (checkType(c,o,OBJ_STRING)) return;
260 * o = dbUnshareStringValue(db,key,o);
261 *
262 * At this point the caller is ready to modify the object, for example
263 * using an sdscat() call to append some data, or anything else.
264 */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)265 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
266 serverAssert(o->type == OBJ_STRING);
267 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
268 robj *decoded = getDecodedObject(o);
269 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
270 decrRefCount(decoded);
271 dbOverwrite(db,key,o);
272 }
273 return o;
274 }
275
emptyDb(void (callback)(void *))276 long long emptyDb(void(callback)(void*)) {
277 int j;
278 long long removed = 0;
279
280 for (j = 0; j < server.dbnum; j++) {
281 removed += dictSize(server.db[j].dict);
282 dictEmpty(server.db[j].dict,callback);
283 dictEmpty(server.db[j].expires,callback);
284 }
285 if (server.cluster_enabled) slotToKeyFlush();
286 return removed;
287 }
288
selectDb(client * c,int id)289 int selectDb(client *c, int id) {
290 if (id < 0 || id >= server.dbnum)
291 return C_ERR;
292 c->db = &server.db[id];
293 return C_OK;
294 }
295
296 /*-----------------------------------------------------------------------------
297 * Hooks for key space changes.
298 *
299 * Every time a key in the database is modified the function
300 * signalModifiedKey() is called.
301 *
302 * Every time a DB is flushed the function signalFlushDb() is called.
303 *----------------------------------------------------------------------------*/
304
signalModifiedKey(redisDb * db,robj * key)305 void signalModifiedKey(redisDb *db, robj *key) {
306 touchWatchedKey(db,key);
307 }
308
signalFlushedDb(int dbid)309 void signalFlushedDb(int dbid) {
310 touchWatchedKeysOnFlush(dbid);
311 }
312
313 /*-----------------------------------------------------------------------------
314 * Type agnostic commands operating on the key space
315 *----------------------------------------------------------------------------*/
316
flushdbCommand(client * c)317 void flushdbCommand(client *c) {
318 server.dirty += dictSize(c->db->dict);
319 signalFlushedDb(c->db->id);
320 dictEmpty(c->db->dict,NULL);
321 dictEmpty(c->db->expires,NULL);
322 if (server.cluster_enabled) slotToKeyFlush();
323 addReply(c,shared.ok);
324 }
325
flushallCommand(client * c)326 void flushallCommand(client *c) {
327 signalFlushedDb(-1);
328 server.dirty += emptyDb(NULL);
329 addReply(c,shared.ok);
330 if (server.rdb_child_pid != -1) {
331 kill(server.rdb_child_pid,SIGUSR1);
332 rdbRemoveTempFile(server.rdb_child_pid);
333 }
334 if (server.saveparamslen > 0) {
335 /* Normally rdbSave() will reset dirty, but we don't want this here
336 * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
337 int saved_dirty = server.dirty;
338 rdbSave(server.rdb_filename);
339 server.dirty = saved_dirty;
340 }
341 server.dirty++;
342 }
343
delCommand(client * c)344 void delCommand(client *c) {
345 int deleted = 0, j;
346
347 for (j = 1; j < c->argc; j++) {
348 expireIfNeeded(c->db,c->argv[j]);
349 if (dbDelete(c->db,c->argv[j])) {
350 signalModifiedKey(c->db,c->argv[j]);
351 notifyKeyspaceEvent(NOTIFY_GENERIC,
352 "del",c->argv[j],c->db->id);
353 server.dirty++;
354 deleted++;
355 }
356 }
357 addReplyLongLong(c,deleted);
358 }
359
360 /* EXISTS key1 key2 ... key_N.
361 * Return value is the number of keys existing. */
existsCommand(client * c)362 void existsCommand(client *c) {
363 long long count = 0;
364 int j;
365
366 for (j = 1; j < c->argc; j++) {
367 expireIfNeeded(c->db,c->argv[j]);
368 if (dbExists(c->db,c->argv[j])) count++;
369 }
370 addReplyLongLong(c,count);
371 }
372
selectCommand(client * c)373 void selectCommand(client *c) {
374 long id;
375
376 if (getLongFromObjectOrReply(c, c->argv[1], &id,
377 "invalid DB index") != C_OK)
378 return;
379
380 if (server.cluster_enabled && id != 0) {
381 addReplyError(c,"SELECT is not allowed in cluster mode");
382 return;
383 }
384 if (selectDb(c,id) == C_ERR) {
385 addReplyError(c,"invalid DB index");
386 } else {
387 addReply(c,shared.ok);
388 }
389 }
390
randomkeyCommand(client * c)391 void randomkeyCommand(client *c) {
392 robj *key;
393
394 if ((key = dbRandomKey(c->db)) == NULL) {
395 addReply(c,shared.nullbulk);
396 return;
397 }
398
399 addReplyBulk(c,key);
400 decrRefCount(key);
401 }
402
keysCommand(client * c)403 void keysCommand(client *c) {
404 dictIterator *di;
405 dictEntry *de;
406 sds pattern = c->argv[1]->ptr;
407 int plen = sdslen(pattern), allkeys;
408 unsigned long numkeys = 0;
409 void *replylen = addDeferredMultiBulkLength(c);
410
411 di = dictGetSafeIterator(c->db->dict);
412 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
413 while((de = dictNext(di)) != NULL) {
414 sds key = dictGetKey(de);
415 robj *keyobj;
416
417 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
418 keyobj = createStringObject(key,sdslen(key));
419 if (expireIfNeeded(c->db,keyobj) == 0) {
420 addReplyBulk(c,keyobj);
421 numkeys++;
422 }
423 decrRefCount(keyobj);
424 }
425 }
426 dictReleaseIterator(di);
427 setDeferredMultiBulkLength(c,replylen,numkeys);
428 }
429
430 /* This callback is used by scanGenericCommand in order to collect elements
431 * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)432 void scanCallback(void *privdata, const dictEntry *de) {
433 void **pd = (void**) privdata;
434 list *keys = pd[0];
435 robj *o = pd[1];
436 robj *key, *val = NULL;
437
438 if (o == NULL) {
439 sds sdskey = dictGetKey(de);
440 key = createStringObject(sdskey, sdslen(sdskey));
441 } else if (o->type == OBJ_SET) {
442 key = dictGetKey(de);
443 incrRefCount(key);
444 } else if (o->type == OBJ_HASH) {
445 key = dictGetKey(de);
446 incrRefCount(key);
447 val = dictGetVal(de);
448 incrRefCount(val);
449 } else if (o->type == OBJ_ZSET) {
450 key = dictGetKey(de);
451 incrRefCount(key);
452 val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
453 } else {
454 serverPanic("Type not handled in SCAN callback.");
455 }
456
457 listAddNodeTail(keys, key);
458 if (val) listAddNodeTail(keys, val);
459 }
460
461 /* Try to parse a SCAN cursor stored at object 'o':
462 * if the cursor is valid, store it as unsigned integer into *cursor and
463 * returns C_OK. Otherwise return C_ERR and send an error to the
464 * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)465 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
466 char *eptr;
467
468 /* Use strtoul() because we need an *unsigned* long, so
469 * getLongLongFromObject() does not cover the whole cursor space. */
470 errno = 0;
471 *cursor = strtoul(o->ptr, &eptr, 10);
472 if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
473 {
474 addReplyError(c, "invalid cursor");
475 return C_ERR;
476 }
477 return C_OK;
478 }
479
480 /* This command implements SCAN, HSCAN and SSCAN commands.
481 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
482 * if 'o' is NULL the command will operate on the dictionary associated with
483 * the current database.
484 *
485 * When 'o' is not NULL the function assumes that the first argument in
486 * the client arguments vector is a key so it skips it before iterating
487 * in order to parse options.
488 *
489 * In the case of a Hash object the function returns both the field and value
490 * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)491 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
492 int i, j;
493 list *keys = listCreate();
494 listNode *node, *nextnode;
495 long count = 10;
496 sds pat = NULL;
497 int patlen = 0, use_pattern = 0;
498 dict *ht;
499
500 /* Object must be NULL (to iterate keys names), or the type of the object
501 * must be Set, Sorted Set, or Hash. */
502 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
503 o->type == OBJ_ZSET);
504
505 /* Set i to the first option argument. The previous one is the cursor. */
506 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
507
508 /* Step 1: Parse options. */
509 while (i < c->argc) {
510 j = c->argc - i;
511 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
512 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
513 != C_OK)
514 {
515 goto cleanup;
516 }
517
518 if (count < 1) {
519 addReply(c,shared.syntaxerr);
520 goto cleanup;
521 }
522
523 i += 2;
524 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
525 pat = c->argv[i+1]->ptr;
526 patlen = sdslen(pat);
527
528 /* The pattern always matches if it is exactly "*", so it is
529 * equivalent to disabling it. */
530 use_pattern = !(pat[0] == '*' && patlen == 1);
531
532 i += 2;
533 } else {
534 addReply(c,shared.syntaxerr);
535 goto cleanup;
536 }
537 }
538
539 /* Step 2: Iterate the collection.
540 *
541 * Note that if the object is encoded with a ziplist, intset, or any other
542 * representation that is not a hash table, we are sure that it is also
543 * composed of a small number of elements. So to avoid taking state we
544 * just return everything inside the object in a single call, setting the
545 * cursor to zero to signal the end of the iteration. */
546
547 /* Handle the case of a hash table. */
548 ht = NULL;
549 if (o == NULL) {
550 ht = c->db->dict;
551 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
552 ht = o->ptr;
553 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
554 ht = o->ptr;
555 count *= 2; /* We return key / value for this type. */
556 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
557 zset *zs = o->ptr;
558 ht = zs->dict;
559 count *= 2; /* We return key / value for this type. */
560 }
561
562 if (ht) {
563 void *privdata[2];
564 /* We set the max number of iterations to ten times the specified
565 * COUNT, so if the hash table is in a pathological state (very
566 * sparsely populated) we avoid to block too much time at the cost
567 * of returning no or very few elements. */
568 long maxiterations = count*10;
569
570 /* We pass two pointers to the callback: the list to which it will
571 * add new elements, and the object containing the dictionary so that
572 * it is possible to fetch more data in a type-dependent way. */
573 privdata[0] = keys;
574 privdata[1] = o;
575 do {
576 cursor = dictScan(ht, cursor, scanCallback, privdata);
577 } while (cursor &&
578 maxiterations-- &&
579 listLength(keys) < (unsigned long)count);
580 } else if (o->type == OBJ_SET) {
581 int pos = 0;
582 int64_t ll;
583
584 while(intsetGet(o->ptr,pos++,&ll))
585 listAddNodeTail(keys,createStringObjectFromLongLong(ll));
586 cursor = 0;
587 } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
588 unsigned char *p = ziplistIndex(o->ptr,0);
589 unsigned char *vstr;
590 unsigned int vlen;
591 long long vll;
592
593 while(p) {
594 ziplistGet(p,&vstr,&vlen,&vll);
595 listAddNodeTail(keys,
596 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
597 createStringObjectFromLongLong(vll));
598 p = ziplistNext(o->ptr,p);
599 }
600 cursor = 0;
601 } else {
602 serverPanic("Not handled encoding in SCAN.");
603 }
604
605 /* Step 3: Filter elements. */
606 node = listFirst(keys);
607 while (node) {
608 robj *kobj = listNodeValue(node);
609 nextnode = listNextNode(node);
610 int filter = 0;
611
612 /* Filter element if it does not match the pattern. */
613 if (!filter && use_pattern) {
614 if (sdsEncodedObject(kobj)) {
615 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
616 filter = 1;
617 } else {
618 char buf[LONG_STR_SIZE];
619 int len;
620
621 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
622 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
623 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
624 }
625 }
626
627 /* Filter element if it is an expired key. */
628 if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
629
630 /* Remove the element and its associted value if needed. */
631 if (filter) {
632 decrRefCount(kobj);
633 listDelNode(keys, node);
634 }
635
636 /* If this is a hash or a sorted set, we have a flat list of
637 * key-value elements, so if this element was filtered, remove the
638 * value, or skip it if it was not filtered: we only match keys. */
639 if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
640 node = nextnode;
641 nextnode = listNextNode(node);
642 if (filter) {
643 kobj = listNodeValue(node);
644 decrRefCount(kobj);
645 listDelNode(keys, node);
646 }
647 }
648 node = nextnode;
649 }
650
651 /* Step 4: Reply to the client. */
652 addReplyMultiBulkLen(c, 2);
653 addReplyBulkLongLong(c,cursor);
654
655 addReplyMultiBulkLen(c, listLength(keys));
656 while ((node = listFirst(keys)) != NULL) {
657 robj *kobj = listNodeValue(node);
658 addReplyBulk(c, kobj);
659 decrRefCount(kobj);
660 listDelNode(keys, node);
661 }
662
663 cleanup:
664 listSetFreeMethod(keys,decrRefCountVoid);
665 listRelease(keys);
666 }
667
668 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)669 void scanCommand(client *c) {
670 unsigned long cursor;
671 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
672 scanGenericCommand(c,NULL,cursor);
673 }
674
dbsizeCommand(client * c)675 void dbsizeCommand(client *c) {
676 addReplyLongLong(c,dictSize(c->db->dict));
677 }
678
lastsaveCommand(client * c)679 void lastsaveCommand(client *c) {
680 addReplyLongLong(c,server.lastsave);
681 }
682
typeCommand(client * c)683 void typeCommand(client *c) {
684 robj *o;
685 char *type;
686
687 o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
688 if (o == NULL) {
689 type = "none";
690 } else {
691 switch(o->type) {
692 case OBJ_STRING: type = "string"; break;
693 case OBJ_LIST: type = "list"; break;
694 case OBJ_SET: type = "set"; break;
695 case OBJ_ZSET: type = "zset"; break;
696 case OBJ_HASH: type = "hash"; break;
697 default: type = "unknown"; break;
698 }
699 }
700 addReplyStatus(c,type);
701 }
702
shutdownCommand(client * c)703 void shutdownCommand(client *c) {
704 int flags = 0;
705
706 if (c->argc > 2) {
707 addReply(c,shared.syntaxerr);
708 return;
709 } else if (c->argc == 2) {
710 if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
711 flags |= SHUTDOWN_NOSAVE;
712 } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
713 flags |= SHUTDOWN_SAVE;
714 } else {
715 addReply(c,shared.syntaxerr);
716 return;
717 }
718 }
719 /* When SHUTDOWN is called while the server is loading a dataset in
720 * memory we need to make sure no attempt is performed to save
721 * the dataset on shutdown (otherwise it could overwrite the current DB
722 * with half-read data).
723 *
724 * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
725 if (server.loading || server.sentinel_mode)
726 flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
727 if (prepareForShutdown(flags) == C_OK) exit(0);
728 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
729 }
730
renameGenericCommand(client * c,int nx)731 void renameGenericCommand(client *c, int nx) {
732 robj *o;
733 long long expire;
734 int samekey = 0;
735
736 /* When source and dest key is the same, no operation is performed,
737 * if the key exists, however we still return an error on unexisting key. */
738 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
739
740 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
741 return;
742
743 if (samekey) {
744 addReply(c,nx ? shared.czero : shared.ok);
745 return;
746 }
747
748 incrRefCount(o);
749 expire = getExpire(c->db,c->argv[1]);
750 if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
751 if (nx) {
752 decrRefCount(o);
753 addReply(c,shared.czero);
754 return;
755 }
756 /* Overwrite: delete the old key before creating the new one
757 * with the same name. */
758 dbDelete(c->db,c->argv[2]);
759 }
760 dbAdd(c->db,c->argv[2],o);
761 if (expire != -1) setExpire(c->db,c->argv[2],expire);
762 dbDelete(c->db,c->argv[1]);
763 signalModifiedKey(c->db,c->argv[1]);
764 signalModifiedKey(c->db,c->argv[2]);
765 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
766 c->argv[1],c->db->id);
767 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
768 c->argv[2],c->db->id);
769 server.dirty++;
770 addReply(c,nx ? shared.cone : shared.ok);
771 }
772
renameCommand(client * c)773 void renameCommand(client *c) {
774 renameGenericCommand(c,0);
775 }
776
renamenxCommand(client * c)777 void renamenxCommand(client *c) {
778 renameGenericCommand(c,1);
779 }
780
moveCommand(client * c)781 void moveCommand(client *c) {
782 robj *o;
783 redisDb *src, *dst;
784 int srcid;
785 long long dbid, expire;
786
787 if (server.cluster_enabled) {
788 addReplyError(c,"MOVE is not allowed in cluster mode");
789 return;
790 }
791
792 /* Obtain source and target DB pointers */
793 src = c->db;
794 srcid = c->db->id;
795
796 if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
797 dbid < INT_MIN || dbid > INT_MAX ||
798 selectDb(c,dbid) == C_ERR)
799 {
800 addReply(c,shared.outofrangeerr);
801 return;
802 }
803 dst = c->db;
804 selectDb(c,srcid); /* Back to the source DB */
805
806 /* If the user is moving using as target the same
807 * DB as the source DB it is probably an error. */
808 if (src == dst) {
809 addReply(c,shared.sameobjecterr);
810 return;
811 }
812
813 /* Check if the element exists and get a reference */
814 o = lookupKeyWrite(c->db,c->argv[1]);
815 if (!o) {
816 addReply(c,shared.czero);
817 return;
818 }
819 expire = getExpire(c->db,c->argv[1]);
820
821 /* Return zero if the key already exists in the target DB */
822 if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
823 addReply(c,shared.czero);
824 return;
825 }
826 dbAdd(dst,c->argv[1],o);
827 if (expire != -1) setExpire(dst,c->argv[1],expire);
828 incrRefCount(o);
829
830 /* OK! key moved, free the entry in the source DB */
831 dbDelete(src,c->argv[1]);
832 server.dirty++;
833 addReply(c,shared.cone);
834 }
835
836 /*-----------------------------------------------------------------------------
837 * Expires API
838 *----------------------------------------------------------------------------*/
839
removeExpire(redisDb * db,robj * key)840 int removeExpire(redisDb *db, robj *key) {
841 /* An expire may only be removed if there is a corresponding entry in the
842 * main dict. Otherwise, the key will never be freed. */
843 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
844 return dictDelete(db->expires,key->ptr) == DICT_OK;
845 }
846
setExpire(redisDb * db,robj * key,long long when)847 void setExpire(redisDb *db, robj *key, long long when) {
848 dictEntry *kde, *de;
849
850 /* Reuse the sds from the main dict in the expire dict */
851 kde = dictFind(db->dict,key->ptr);
852 serverAssertWithInfo(NULL,key,kde != NULL);
853 de = dictReplaceRaw(db->expires,dictGetKey(kde));
854 dictSetSignedIntegerVal(de,when);
855 }
856
857 /* Return the expire time of the specified key, or -1 if no expire
858 * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)859 long long getExpire(redisDb *db, robj *key) {
860 dictEntry *de;
861
862 /* No expire? return ASAP */
863 if (dictSize(db->expires) == 0 ||
864 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
865
866 /* The entry was found in the expire dict, this means it should also
867 * be present in the main dict (safety check). */
868 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
869 return dictGetSignedIntegerVal(de);
870 }
871
872 /* Propagate expires into slaves and the AOF file.
873 * When a key expires in the master, a DEL operation for this key is sent
874 * to all the slaves and the AOF file if enabled.
875 *
876 * This way the key expiry is centralized in one place, and since both
877 * AOF and the master->slave link guarantee operation ordering, everything
878 * will be consistent even if we allow write operations against expiring
879 * keys. */
propagateExpire(redisDb * db,robj * key)880 void propagateExpire(redisDb *db, robj *key) {
881 robj *argv[2];
882
883 argv[0] = shared.del;
884 argv[1] = key;
885 incrRefCount(argv[0]);
886 incrRefCount(argv[1]);
887
888 if (server.aof_state != AOF_OFF)
889 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
890 replicationFeedSlaves(server.slaves,db->id,argv,2);
891
892 decrRefCount(argv[0]);
893 decrRefCount(argv[1]);
894 }
895
expireIfNeeded(redisDb * db,robj * key)896 int expireIfNeeded(redisDb *db, robj *key) {
897 mstime_t when = getExpire(db,key);
898 mstime_t now;
899
900 if (when < 0) return 0; /* No expire for this key */
901
902 /* Don't expire anything while loading. It will be done later. */
903 if (server.loading) return 0;
904
905 /* If we are in the context of a Lua script, we claim that time is
906 * blocked to when the Lua script started. This way a key can expire
907 * only the first time it is accessed and not in the middle of the
908 * script execution, making propagation to slaves / AOF consistent.
909 * See issue #1525 on Github for more information. */
910 now = server.lua_caller ? server.lua_time_start : mstime();
911
912 /* If we are running in the context of a slave, return ASAP:
913 * the slave key expiration is controlled by the master that will
914 * send us synthesized DEL operations for expired keys.
915 *
916 * Still we try to return the right information to the caller,
917 * that is, 0 if we think the key should be still valid, 1 if
918 * we think the key is expired at this time. */
919 if (server.masterhost != NULL) return now > when;
920
921 /* Return when this key has not expired */
922 if (now <= when) return 0;
923
924 /* Delete the key */
925 server.stat_expiredkeys++;
926 propagateExpire(db,key);
927 notifyKeyspaceEvent(NOTIFY_EXPIRED,
928 "expired",key,db->id);
929 return dbDelete(db,key);
930 }
931
932 /*-----------------------------------------------------------------------------
933 * Expires Commands
934 *----------------------------------------------------------------------------*/
935
936 /* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
937 * and PEXPIREAT. Because the commad second argument may be relative or absolute
938 * the "basetime" argument is used to signal what the base time is (either 0
939 * for *AT variants of the command, or the current time for relative expires).
940 *
941 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
942 * the argv[2] parameter. The basetime is always specified in milliseconds. */
expireGenericCommand(client * c,long long basetime,int unit)943 void expireGenericCommand(client *c, long long basetime, int unit) {
944 robj *key = c->argv[1], *param = c->argv[2];
945 long long when; /* unix time in milliseconds when the key will expire. */
946
947 if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
948 return;
949
950 if (unit == UNIT_SECONDS) when *= 1000;
951 when += basetime;
952
953 /* No key, return zero. */
954 if (lookupKeyWrite(c->db,key) == NULL) {
955 addReply(c,shared.czero);
956 return;
957 }
958
959 /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
960 * should never be executed as a DEL when load the AOF or in the context
961 * of a slave instance.
962 *
963 * Instead we take the other branch of the IF statement setting an expire
964 * (possibly in the past) and wait for an explicit DEL from the master. */
965 if (when <= mstime() && !server.loading && !server.masterhost) {
966 robj *aux;
967
968 serverAssertWithInfo(c,key,dbDelete(c->db,key));
969 server.dirty++;
970
971 /* Replicate/AOF this as an explicit DEL. */
972 aux = createStringObject("DEL",3);
973 rewriteClientCommandVector(c,2,aux,key);
974 decrRefCount(aux);
975 signalModifiedKey(c->db,key);
976 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
977 addReply(c, shared.cone);
978 return;
979 } else {
980 setExpire(c->db,key,when);
981 addReply(c,shared.cone);
982 signalModifiedKey(c->db,key);
983 notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
984 server.dirty++;
985 return;
986 }
987 }
988
expireCommand(client * c)989 void expireCommand(client *c) {
990 expireGenericCommand(c,mstime(),UNIT_SECONDS);
991 }
992
expireatCommand(client * c)993 void expireatCommand(client *c) {
994 expireGenericCommand(c,0,UNIT_SECONDS);
995 }
996
pexpireCommand(client * c)997 void pexpireCommand(client *c) {
998 expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
999 }
1000
pexpireatCommand(client * c)1001 void pexpireatCommand(client *c) {
1002 expireGenericCommand(c,0,UNIT_MILLISECONDS);
1003 }
1004
ttlGenericCommand(client * c,int output_ms)1005 void ttlGenericCommand(client *c, int output_ms) {
1006 long long expire, ttl = -1;
1007
1008 /* If the key does not exist at all, return -2 */
1009 if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) {
1010 addReplyLongLong(c,-2);
1011 return;
1012 }
1013 /* The key exists. Return -1 if it has no expire, or the actual
1014 * TTL value otherwise. */
1015 expire = getExpire(c->db,c->argv[1]);
1016 if (expire != -1) {
1017 ttl = expire-mstime();
1018 if (ttl < 0) ttl = 0;
1019 }
1020 if (ttl == -1) {
1021 addReplyLongLong(c,-1);
1022 } else {
1023 addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
1024 }
1025 }
1026
ttlCommand(client * c)1027 void ttlCommand(client *c) {
1028 ttlGenericCommand(c, 0);
1029 }
1030
pttlCommand(client * c)1031 void pttlCommand(client *c) {
1032 ttlGenericCommand(c, 1);
1033 }
1034
persistCommand(client * c)1035 void persistCommand(client *c) {
1036 dictEntry *de;
1037
1038 de = dictFind(c->db->dict,c->argv[1]->ptr);
1039 if (de == NULL) {
1040 addReply(c,shared.czero);
1041 } else {
1042 if (removeExpire(c->db,c->argv[1])) {
1043 addReply(c,shared.cone);
1044 server.dirty++;
1045 } else {
1046 addReply(c,shared.czero);
1047 }
1048 }
1049 }
1050
1051 /* TOUCH key1 [key2 key3 ... keyN] */
touchCommand(client * c)1052 void touchCommand(client *c) {
1053 int touched = 0;
1054 for (int j = 1; j < c->argc; j++)
1055 if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++;
1056 addReplyLongLong(c,touched);
1057 }
1058
1059 /* -----------------------------------------------------------------------------
1060 * API to get key arguments from commands
1061 * ---------------------------------------------------------------------------*/
1062
1063 /* The base case is to use the keys position as given in the command table
1064 * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1065 int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
1066 int j, i = 0, last, *keys;
1067 UNUSED(argv);
1068
1069 if (cmd->firstkey == 0) {
1070 *numkeys = 0;
1071 return NULL;
1072 }
1073 last = cmd->lastkey;
1074 if (last < 0) last = argc+last;
1075 keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
1076 for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1077 serverAssert(j < argc);
1078 keys[i++] = j;
1079 }
1080 *numkeys = i;
1081 return keys;
1082 }
1083
1084 /* Return all the arguments that are keys in the command passed via argc / argv.
1085 *
1086 * The command returns the positions of all the key arguments inside the array,
1087 * so the actual return value is an heap allocated array of integers. The
1088 * length of the array is returned by reference into *numkeys.
1089 *
1090 * 'cmd' must be point to the corresponding entry into the redisCommand
1091 * table, according to the command name in argv[0].
1092 *
1093 * This function uses the command table if a command-specific helper function
1094 * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1095 int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1096 if (cmd->getkeys_proc) {
1097 return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1098 } else {
1099 return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
1100 }
1101 }
1102
1103 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(int * result)1104 void getKeysFreeResult(int *result) {
1105 zfree(result);
1106 }
1107
1108 /* Helper function to extract keys from following commands:
1109 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1110 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1111 int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1112 int i, num, *keys;
1113 UNUSED(cmd);
1114
1115 num = atoi(argv[2]->ptr);
1116 /* Sanity check. Don't return any key if the command is going to
1117 * reply with syntax error. */
1118 if (num > (argc-3)) {
1119 *numkeys = 0;
1120 return NULL;
1121 }
1122
1123 /* Keys in z{union,inter}store come from two places:
1124 * argv[1] = storage key,
1125 * argv[3...n] = keys to intersect */
1126 keys = zmalloc(sizeof(int)*(num+1));
1127
1128 /* Add all key positions for argv[3...n] to keys[] */
1129 for (i = 0; i < num; i++) keys[i] = 3+i;
1130
1131 /* Finally add the argv[1] key position (the storage key target). */
1132 keys[num] = 1;
1133 *numkeys = num+1; /* Total keys = {union,inter} keys + storage key */
1134 return keys;
1135 }
1136
1137 /* Helper function to extract keys from the following commands:
1138 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1139 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1140 int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1141 int i, num, *keys;
1142 UNUSED(cmd);
1143
1144 num = atoi(argv[2]->ptr);
1145 /* Sanity check. Don't return any key if the command is going to
1146 * reply with syntax error. */
1147 if (num > (argc-3)) {
1148 *numkeys = 0;
1149 return NULL;
1150 }
1151
1152 keys = zmalloc(sizeof(int)*num);
1153 *numkeys = num;
1154
1155 /* Add all key positions for argv[3...n] to keys[] */
1156 for (i = 0; i < num; i++) keys[i] = 3+i;
1157
1158 return keys;
1159 }
1160
1161 /* Helper function to extract keys from the SORT command.
1162 *
1163 * SORT <sort-key> ... STORE <store-key> ...
1164 *
1165 * The first argument of SORT is always a key, however a list of options
1166 * follow in SQL-alike style. Here we parse just the minimum in order to
1167 * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1168 int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1169 int i, j, num, *keys, found_store = 0;
1170 UNUSED(cmd);
1171
1172 num = 0;
1173 keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
1174
1175 keys[num++] = 1; /* <sort-key> is always present. */
1176
1177 /* Search for STORE option. By default we consider options to don't
1178 * have arguments, so if we find an unknown option name we scan the
1179 * next. However there are options with 1 or 2 arguments, so we
1180 * provide a list here in order to skip the right number of args. */
1181 struct {
1182 char *name;
1183 int skip;
1184 } skiplist[] = {
1185 {"limit", 2},
1186 {"get", 1},
1187 {"by", 1},
1188 {NULL, 0} /* End of elements. */
1189 };
1190
1191 for (i = 2; i < argc; i++) {
1192 for (j = 0; skiplist[j].name != NULL; j++) {
1193 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1194 i += skiplist[j].skip;
1195 break;
1196 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1197 /* Note: we don't increment "num" here and continue the loop
1198 * to be sure to process the *last* "STORE" option if multiple
1199 * ones are provided. This is same behavior as SORT. */
1200 found_store = 1;
1201 keys[num] = i+1; /* <store-key> */
1202 break;
1203 }
1204 }
1205 }
1206 *numkeys = num + found_store;
1207 return keys;
1208 }
1209
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1210 int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1211 int i, num, first, *keys;
1212 UNUSED(cmd);
1213
1214 /* Assume the obvious form. */
1215 first = 3;
1216 num = 1;
1217
1218 /* But check for the extended one with the KEYS option. */
1219 if (argc > 6) {
1220 for (i = 6; i < argc; i++) {
1221 if (!strcasecmp(argv[i]->ptr,"keys") &&
1222 sdslen(argv[3]->ptr) == 0)
1223 {
1224 first = i+1;
1225 num = argc-first;
1226 break;
1227 }
1228 }
1229 }
1230
1231 keys = zmalloc(sizeof(int)*num);
1232 for (i = 0; i < num; i++) keys[i] = first+i;
1233 *numkeys = num;
1234 return keys;
1235 }
1236
1237 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1238 * a fast way a key that belongs to a specified hash slot. This is useful
1239 * while rehashing the cluster. */
slotToKeyAdd(robj * key)1240 void slotToKeyAdd(robj *key) {
1241 unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1242
1243 zslInsert(server.cluster->slots_to_keys,hashslot,key);
1244 incrRefCount(key);
1245 }
1246
slotToKeyDel(robj * key)1247 void slotToKeyDel(robj *key) {
1248 unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1249
1250 zslDelete(server.cluster->slots_to_keys,hashslot,key);
1251 }
1252
slotToKeyFlush(void)1253 void slotToKeyFlush(void) {
1254 zslFree(server.cluster->slots_to_keys);
1255 server.cluster->slots_to_keys = zslCreate();
1256 }
1257
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1258 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1259 zskiplistNode *n;
1260 zrangespec range;
1261 int j = 0;
1262
1263 range.min = range.max = hashslot;
1264 range.minex = range.maxex = 0;
1265
1266 n = zslFirstInRange(server.cluster->slots_to_keys, &range);
1267 while(n && n->score == hashslot && count--) {
1268 keys[j++] = n->obj;
1269 n = n->level[0].forward;
1270 }
1271 return j;
1272 }
1273
1274 /* Remove all the keys in the specified hash slot.
1275 * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1276 unsigned int delKeysInSlot(unsigned int hashslot) {
1277 zskiplistNode *n;
1278 zrangespec range;
1279 int j = 0;
1280
1281 range.min = range.max = hashslot;
1282 range.minex = range.maxex = 0;
1283
1284 n = zslFirstInRange(server.cluster->slots_to_keys, &range);
1285 while(n && n->score == hashslot) {
1286 robj *key = n->obj;
1287 n = n->level[0].forward; /* Go to the next item before freeing it. */
1288 incrRefCount(key); /* Protect the object while freeing it. */
1289 dbDelete(&server.db[0],key);
1290 decrRefCount(key);
1291 j++;
1292 }
1293 return j;
1294 }
1295
countKeysInSlot(unsigned int hashslot)1296 unsigned int countKeysInSlot(unsigned int hashslot) {
1297 zskiplist *zsl = server.cluster->slots_to_keys;
1298 zskiplistNode *zn;
1299 zrangespec range;
1300 int rank, count = 0;
1301
1302 range.min = range.max = hashslot;
1303 range.minex = range.maxex = 0;
1304
1305 /* Find first element in range */
1306 zn = zslFirstInRange(zsl, &range);
1307
1308 /* Use rank of first element, if any, to determine preliminary count */
1309 if (zn != NULL) {
1310 rank = zslGetRank(zsl, zn->score, zn->obj);
1311 count = (zsl->length - (rank - 1));
1312
1313 /* Find last element in range */
1314 zn = zslLastInRange(zsl, &range);
1315
1316 /* Use rank of last element, if any, to determine the actual count */
1317 if (zn != NULL) {
1318 rank = zslGetRank(zsl, zn->score, zn->obj);
1319 count -= (zsl->length - rank);
1320 }
1321 }
1322 return count;
1323 }
1324