xref: /redis-3.2.3/src/db.c (revision b23aa670)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 
33 #include <signal.h>
34 #include <ctype.h>
35 
36 void slotToKeyAdd(robj *key);
37 void slotToKeyDel(robj *key);
38 void slotToKeyFlush(void);
39 
40 /*-----------------------------------------------------------------------------
41  * C-level DB API
42  *----------------------------------------------------------------------------*/
43 
44 /* Low level key lookup API, not actually called directly from commands
45  * implementations that should instead rely on lookupKeyRead(),
46  * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)47 robj *lookupKey(redisDb *db, robj *key, int flags) {
48     dictEntry *de = dictFind(db->dict,key->ptr);
49     if (de) {
50         robj *val = dictGetVal(de);
51 
52         /* Update the access time for the ageing algorithm.
53          * Don't do it if we have a saving child, as this will trigger
54          * a copy on write madness. */
55         if (server.rdb_child_pid == -1 &&
56             server.aof_child_pid == -1 &&
57             !(flags & LOOKUP_NOTOUCH))
58         {
59             val->lru = LRU_CLOCK();
60         }
61         return val;
62     } else {
63         return NULL;
64     }
65 }
66 
67 /* Lookup a key for read operations, or return NULL if the key is not found
68  * in the specified DB.
69  *
70  * As a side effect of calling this function:
71  * 1. A key gets expired if it reached it's TTL.
72  * 2. The key last access time is updated.
73  * 3. The global keys hits/misses stats are updated (reported in INFO).
74  *
75  * This API should not be used when we write to the key after obtaining
76  * the object linked to the key, but only for read only operations.
77  *
78  * Flags change the behavior of this command:
79  *
80  *  LOOKUP_NONE (or zero): no special flags are passed.
81  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
82  *
83  * Note: this function also returns NULL is the key is logically expired
84  * but still existing, in case this is a slave, since this API is called only
85  * for read operations. Even if the key expiry is master-driven, we can
86  * correctly report a key is expired on slaves even if the master is lagging
87  * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)88 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
89     robj *val;
90 
91     if (expireIfNeeded(db,key) == 1) {
92         /* Key expired. If we are in the context of a master, expireIfNeeded()
93          * returns 0 only when the key does not exist at all, so it's save
94          * to return NULL ASAP. */
95         if (server.masterhost == NULL) return NULL;
96 
97         /* However if we are in the context of a slave, expireIfNeeded() will
98          * not really try to expire the key, it only returns information
99          * about the "logical" status of the key: key expiring is up to the
100          * master in order to have a consistent view of master's data set.
101          *
102          * However, if the command caller is not the master, and as additional
103          * safety measure, the command invoked is a read-only command, we can
104          * safely return NULL here, and provide a more consistent behavior
105          * to clients accessign expired values in a read-only fashion, that
106          * will say the key as non exisitng.
107          *
108          * Notably this covers GETs when slaves are used to scale reads. */
109         if (server.current_client &&
110             server.current_client != server.master &&
111             server.current_client->cmd &&
112             server.current_client->cmd->flags & CMD_READONLY)
113         {
114             return NULL;
115         }
116     }
117     val = lookupKey(db,key,flags);
118     if (val == NULL)
119         server.stat_keyspace_misses++;
120     else
121         server.stat_keyspace_hits++;
122     return val;
123 }
124 
125 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
126  * common case. */
lookupKeyRead(redisDb * db,robj * key)127 robj *lookupKeyRead(redisDb *db, robj *key) {
128     return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
129 }
130 
131 /* Lookup a key for write operations, and as a side effect, if needed, expires
132  * the key if its TTL is reached.
133  *
134  * Returns the linked value object if the key exists or NULL if the key
135  * does not exist in the specified DB. */
lookupKeyWrite(redisDb * db,robj * key)136 robj *lookupKeyWrite(redisDb *db, robj *key) {
137     expireIfNeeded(db,key);
138     return lookupKey(db,key,LOOKUP_NONE);
139 }
140 
lookupKeyReadOrReply(client * c,robj * key,robj * reply)141 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
142     robj *o = lookupKeyRead(c->db, key);
143     if (!o) addReply(c,reply);
144     return o;
145 }
146 
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)147 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
148     robj *o = lookupKeyWrite(c->db, key);
149     if (!o) addReply(c,reply);
150     return o;
151 }
152 
153 /* Add the key to the DB. It's up to the caller to increment the reference
154  * counter of the value if needed.
155  *
156  * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)157 void dbAdd(redisDb *db, robj *key, robj *val) {
158     sds copy = sdsdup(key->ptr);
159     int retval = dictAdd(db->dict, copy, val);
160 
161     serverAssertWithInfo(NULL,key,retval == DICT_OK);
162     if (val->type == OBJ_LIST) signalListAsReady(db, key);
163     if (server.cluster_enabled) slotToKeyAdd(key);
164  }
165 
166 /* Overwrite an existing key with a new value. Incrementing the reference
167  * count of the new value is up to the caller.
168  * This function does not modify the expire time of the existing key.
169  *
170  * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)171 void dbOverwrite(redisDb *db, robj *key, robj *val) {
172     dictEntry *de = dictFind(db->dict,key->ptr);
173 
174     serverAssertWithInfo(NULL,key,de != NULL);
175     dictReplace(db->dict, key->ptr, val);
176 }
177 
178 /* High level Set operation. This function can be used in order to set
179  * a key, whatever it was existing or not, to a new object.
180  *
181  * 1) The ref count of the value object is incremented.
182  * 2) clients WATCHing for the destination key notified.
183  * 3) The expire time of the key is reset (the key is made persistent). */
setKey(redisDb * db,robj * key,robj * val)184 void setKey(redisDb *db, robj *key, robj *val) {
185     if (lookupKeyWrite(db,key) == NULL) {
186         dbAdd(db,key,val);
187     } else {
188         dbOverwrite(db,key,val);
189     }
190     incrRefCount(val);
191     removeExpire(db,key);
192     signalModifiedKey(db,key);
193 }
194 
dbExists(redisDb * db,robj * key)195 int dbExists(redisDb *db, robj *key) {
196     return dictFind(db->dict,key->ptr) != NULL;
197 }
198 
199 /* Return a random key, in form of a Redis object.
200  * If there are no keys, NULL is returned.
201  *
202  * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)203 robj *dbRandomKey(redisDb *db) {
204     dictEntry *de;
205 
206     while(1) {
207         sds key;
208         robj *keyobj;
209 
210         de = dictGetRandomKey(db->dict);
211         if (de == NULL) return NULL;
212 
213         key = dictGetKey(de);
214         keyobj = createStringObject(key,sdslen(key));
215         if (dictFind(db->expires,key)) {
216             if (expireIfNeeded(db,keyobj)) {
217                 decrRefCount(keyobj);
218                 continue; /* search for another key. This expired. */
219             }
220         }
221         return keyobj;
222     }
223 }
224 
225 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbDelete(redisDb * db,robj * key)226 int dbDelete(redisDb *db, robj *key) {
227     /* Deleting an entry from the expires dict will not free the sds of
228      * the key, because it is shared with the main dictionary. */
229     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
230     if (dictDelete(db->dict,key->ptr) == DICT_OK) {
231         if (server.cluster_enabled) slotToKeyDel(key);
232         return 1;
233     } else {
234         return 0;
235     }
236 }
237 
238 /* Prepare the string object stored at 'key' to be modified destructively
239  * to implement commands like SETBIT or APPEND.
240  *
241  * An object is usually ready to be modified unless one of the two conditions
242  * are true:
243  *
244  * 1) The object 'o' is shared (refcount > 1), we don't want to affect
245  *    other users.
246  * 2) The object encoding is not "RAW".
247  *
248  * If the object is found in one of the above conditions (or both) by the
249  * function, an unshared / not-encoded copy of the string object is stored
250  * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
251  * returned.
252  *
253  * USAGE:
254  *
255  * The object 'o' is what the caller already obtained by looking up 'key'
256  * in 'db', the usage pattern looks like this:
257  *
258  * o = lookupKeyWrite(db,key);
259  * if (checkType(c,o,OBJ_STRING)) return;
260  * o = dbUnshareStringValue(db,key,o);
261  *
262  * At this point the caller is ready to modify the object, for example
263  * using an sdscat() call to append some data, or anything else.
264  */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)265 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
266     serverAssert(o->type == OBJ_STRING);
267     if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
268         robj *decoded = getDecodedObject(o);
269         o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
270         decrRefCount(decoded);
271         dbOverwrite(db,key,o);
272     }
273     return o;
274 }
275 
emptyDb(void (callback)(void *))276 long long emptyDb(void(callback)(void*)) {
277     int j;
278     long long removed = 0;
279 
280     for (j = 0; j < server.dbnum; j++) {
281         removed += dictSize(server.db[j].dict);
282         dictEmpty(server.db[j].dict,callback);
283         dictEmpty(server.db[j].expires,callback);
284     }
285     if (server.cluster_enabled) slotToKeyFlush();
286     return removed;
287 }
288 
selectDb(client * c,int id)289 int selectDb(client *c, int id) {
290     if (id < 0 || id >= server.dbnum)
291         return C_ERR;
292     c->db = &server.db[id];
293     return C_OK;
294 }
295 
296 /*-----------------------------------------------------------------------------
297  * Hooks for key space changes.
298  *
299  * Every time a key in the database is modified the function
300  * signalModifiedKey() is called.
301  *
302  * Every time a DB is flushed the function signalFlushDb() is called.
303  *----------------------------------------------------------------------------*/
304 
signalModifiedKey(redisDb * db,robj * key)305 void signalModifiedKey(redisDb *db, robj *key) {
306     touchWatchedKey(db,key);
307 }
308 
signalFlushedDb(int dbid)309 void signalFlushedDb(int dbid) {
310     touchWatchedKeysOnFlush(dbid);
311 }
312 
313 /*-----------------------------------------------------------------------------
314  * Type agnostic commands operating on the key space
315  *----------------------------------------------------------------------------*/
316 
flushdbCommand(client * c)317 void flushdbCommand(client *c) {
318     server.dirty += dictSize(c->db->dict);
319     signalFlushedDb(c->db->id);
320     dictEmpty(c->db->dict,NULL);
321     dictEmpty(c->db->expires,NULL);
322     if (server.cluster_enabled) slotToKeyFlush();
323     addReply(c,shared.ok);
324 }
325 
flushallCommand(client * c)326 void flushallCommand(client *c) {
327     signalFlushedDb(-1);
328     server.dirty += emptyDb(NULL);
329     addReply(c,shared.ok);
330     if (server.rdb_child_pid != -1) {
331         kill(server.rdb_child_pid,SIGUSR1);
332         rdbRemoveTempFile(server.rdb_child_pid);
333     }
334     if (server.saveparamslen > 0) {
335         /* Normally rdbSave() will reset dirty, but we don't want this here
336          * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
337         int saved_dirty = server.dirty;
338         rdbSave(server.rdb_filename);
339         server.dirty = saved_dirty;
340     }
341     server.dirty++;
342 }
343 
delCommand(client * c)344 void delCommand(client *c) {
345     int deleted = 0, j;
346 
347     for (j = 1; j < c->argc; j++) {
348         expireIfNeeded(c->db,c->argv[j]);
349         if (dbDelete(c->db,c->argv[j])) {
350             signalModifiedKey(c->db,c->argv[j]);
351             notifyKeyspaceEvent(NOTIFY_GENERIC,
352                 "del",c->argv[j],c->db->id);
353             server.dirty++;
354             deleted++;
355         }
356     }
357     addReplyLongLong(c,deleted);
358 }
359 
360 /* EXISTS key1 key2 ... key_N.
361  * Return value is the number of keys existing. */
existsCommand(client * c)362 void existsCommand(client *c) {
363     long long count = 0;
364     int j;
365 
366     for (j = 1; j < c->argc; j++) {
367         expireIfNeeded(c->db,c->argv[j]);
368         if (dbExists(c->db,c->argv[j])) count++;
369     }
370     addReplyLongLong(c,count);
371 }
372 
selectCommand(client * c)373 void selectCommand(client *c) {
374     long id;
375 
376     if (getLongFromObjectOrReply(c, c->argv[1], &id,
377         "invalid DB index") != C_OK)
378         return;
379 
380     if (server.cluster_enabled && id != 0) {
381         addReplyError(c,"SELECT is not allowed in cluster mode");
382         return;
383     }
384     if (selectDb(c,id) == C_ERR) {
385         addReplyError(c,"invalid DB index");
386     } else {
387         addReply(c,shared.ok);
388     }
389 }
390 
randomkeyCommand(client * c)391 void randomkeyCommand(client *c) {
392     robj *key;
393 
394     if ((key = dbRandomKey(c->db)) == NULL) {
395         addReply(c,shared.nullbulk);
396         return;
397     }
398 
399     addReplyBulk(c,key);
400     decrRefCount(key);
401 }
402 
keysCommand(client * c)403 void keysCommand(client *c) {
404     dictIterator *di;
405     dictEntry *de;
406     sds pattern = c->argv[1]->ptr;
407     int plen = sdslen(pattern), allkeys;
408     unsigned long numkeys = 0;
409     void *replylen = addDeferredMultiBulkLength(c);
410 
411     di = dictGetSafeIterator(c->db->dict);
412     allkeys = (pattern[0] == '*' && pattern[1] == '\0');
413     while((de = dictNext(di)) != NULL) {
414         sds key = dictGetKey(de);
415         robj *keyobj;
416 
417         if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
418             keyobj = createStringObject(key,sdslen(key));
419             if (expireIfNeeded(c->db,keyobj) == 0) {
420                 addReplyBulk(c,keyobj);
421                 numkeys++;
422             }
423             decrRefCount(keyobj);
424         }
425     }
426     dictReleaseIterator(di);
427     setDeferredMultiBulkLength(c,replylen,numkeys);
428 }
429 
430 /* This callback is used by scanGenericCommand in order to collect elements
431  * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)432 void scanCallback(void *privdata, const dictEntry *de) {
433     void **pd = (void**) privdata;
434     list *keys = pd[0];
435     robj *o = pd[1];
436     robj *key, *val = NULL;
437 
438     if (o == NULL) {
439         sds sdskey = dictGetKey(de);
440         key = createStringObject(sdskey, sdslen(sdskey));
441     } else if (o->type == OBJ_SET) {
442         key = dictGetKey(de);
443         incrRefCount(key);
444     } else if (o->type == OBJ_HASH) {
445         key = dictGetKey(de);
446         incrRefCount(key);
447         val = dictGetVal(de);
448         incrRefCount(val);
449     } else if (o->type == OBJ_ZSET) {
450         key = dictGetKey(de);
451         incrRefCount(key);
452         val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
453     } else {
454         serverPanic("Type not handled in SCAN callback.");
455     }
456 
457     listAddNodeTail(keys, key);
458     if (val) listAddNodeTail(keys, val);
459 }
460 
461 /* Try to parse a SCAN cursor stored at object 'o':
462  * if the cursor is valid, store it as unsigned integer into *cursor and
463  * returns C_OK. Otherwise return C_ERR and send an error to the
464  * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)465 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
466     char *eptr;
467 
468     /* Use strtoul() because we need an *unsigned* long, so
469      * getLongLongFromObject() does not cover the whole cursor space. */
470     errno = 0;
471     *cursor = strtoul(o->ptr, &eptr, 10);
472     if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
473     {
474         addReplyError(c, "invalid cursor");
475         return C_ERR;
476     }
477     return C_OK;
478 }
479 
480 /* This command implements SCAN, HSCAN and SSCAN commands.
481  * If object 'o' is passed, then it must be a Hash or Set object, otherwise
482  * if 'o' is NULL the command will operate on the dictionary associated with
483  * the current database.
484  *
485  * When 'o' is not NULL the function assumes that the first argument in
486  * the client arguments vector is a key so it skips it before iterating
487  * in order to parse options.
488  *
489  * In the case of a Hash object the function returns both the field and value
490  * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)491 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
492     int i, j;
493     list *keys = listCreate();
494     listNode *node, *nextnode;
495     long count = 10;
496     sds pat = NULL;
497     int patlen = 0, use_pattern = 0;
498     dict *ht;
499 
500     /* Object must be NULL (to iterate keys names), or the type of the object
501      * must be Set, Sorted Set, or Hash. */
502     serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
503                 o->type == OBJ_ZSET);
504 
505     /* Set i to the first option argument. The previous one is the cursor. */
506     i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
507 
508     /* Step 1: Parse options. */
509     while (i < c->argc) {
510         j = c->argc - i;
511         if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
512             if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
513                 != C_OK)
514             {
515                 goto cleanup;
516             }
517 
518             if (count < 1) {
519                 addReply(c,shared.syntaxerr);
520                 goto cleanup;
521             }
522 
523             i += 2;
524         } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
525             pat = c->argv[i+1]->ptr;
526             patlen = sdslen(pat);
527 
528             /* The pattern always matches if it is exactly "*", so it is
529              * equivalent to disabling it. */
530             use_pattern = !(pat[0] == '*' && patlen == 1);
531 
532             i += 2;
533         } else {
534             addReply(c,shared.syntaxerr);
535             goto cleanup;
536         }
537     }
538 
539     /* Step 2: Iterate the collection.
540      *
541      * Note that if the object is encoded with a ziplist, intset, or any other
542      * representation that is not a hash table, we are sure that it is also
543      * composed of a small number of elements. So to avoid taking state we
544      * just return everything inside the object in a single call, setting the
545      * cursor to zero to signal the end of the iteration. */
546 
547     /* Handle the case of a hash table. */
548     ht = NULL;
549     if (o == NULL) {
550         ht = c->db->dict;
551     } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
552         ht = o->ptr;
553     } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
554         ht = o->ptr;
555         count *= 2; /* We return key / value for this type. */
556     } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
557         zset *zs = o->ptr;
558         ht = zs->dict;
559         count *= 2; /* We return key / value for this type. */
560     }
561 
562     if (ht) {
563         void *privdata[2];
564         /* We set the max number of iterations to ten times the specified
565          * COUNT, so if the hash table is in a pathological state (very
566          * sparsely populated) we avoid to block too much time at the cost
567          * of returning no or very few elements. */
568         long maxiterations = count*10;
569 
570         /* We pass two pointers to the callback: the list to which it will
571          * add new elements, and the object containing the dictionary so that
572          * it is possible to fetch more data in a type-dependent way. */
573         privdata[0] = keys;
574         privdata[1] = o;
575         do {
576             cursor = dictScan(ht, cursor, scanCallback, privdata);
577         } while (cursor &&
578               maxiterations-- &&
579               listLength(keys) < (unsigned long)count);
580     } else if (o->type == OBJ_SET) {
581         int pos = 0;
582         int64_t ll;
583 
584         while(intsetGet(o->ptr,pos++,&ll))
585             listAddNodeTail(keys,createStringObjectFromLongLong(ll));
586         cursor = 0;
587     } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
588         unsigned char *p = ziplistIndex(o->ptr,0);
589         unsigned char *vstr;
590         unsigned int vlen;
591         long long vll;
592 
593         while(p) {
594             ziplistGet(p,&vstr,&vlen,&vll);
595             listAddNodeTail(keys,
596                 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
597                                  createStringObjectFromLongLong(vll));
598             p = ziplistNext(o->ptr,p);
599         }
600         cursor = 0;
601     } else {
602         serverPanic("Not handled encoding in SCAN.");
603     }
604 
605     /* Step 3: Filter elements. */
606     node = listFirst(keys);
607     while (node) {
608         robj *kobj = listNodeValue(node);
609         nextnode = listNextNode(node);
610         int filter = 0;
611 
612         /* Filter element if it does not match the pattern. */
613         if (!filter && use_pattern) {
614             if (sdsEncodedObject(kobj)) {
615                 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
616                     filter = 1;
617             } else {
618                 char buf[LONG_STR_SIZE];
619                 int len;
620 
621                 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
622                 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
623                 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
624             }
625         }
626 
627         /* Filter element if it is an expired key. */
628         if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
629 
630         /* Remove the element and its associted value if needed. */
631         if (filter) {
632             decrRefCount(kobj);
633             listDelNode(keys, node);
634         }
635 
636         /* If this is a hash or a sorted set, we have a flat list of
637          * key-value elements, so if this element was filtered, remove the
638          * value, or skip it if it was not filtered: we only match keys. */
639         if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
640             node = nextnode;
641             nextnode = listNextNode(node);
642             if (filter) {
643                 kobj = listNodeValue(node);
644                 decrRefCount(kobj);
645                 listDelNode(keys, node);
646             }
647         }
648         node = nextnode;
649     }
650 
651     /* Step 4: Reply to the client. */
652     addReplyMultiBulkLen(c, 2);
653     addReplyBulkLongLong(c,cursor);
654 
655     addReplyMultiBulkLen(c, listLength(keys));
656     while ((node = listFirst(keys)) != NULL) {
657         robj *kobj = listNodeValue(node);
658         addReplyBulk(c, kobj);
659         decrRefCount(kobj);
660         listDelNode(keys, node);
661     }
662 
663 cleanup:
664     listSetFreeMethod(keys,decrRefCountVoid);
665     listRelease(keys);
666 }
667 
668 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)669 void scanCommand(client *c) {
670     unsigned long cursor;
671     if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
672     scanGenericCommand(c,NULL,cursor);
673 }
674 
dbsizeCommand(client * c)675 void dbsizeCommand(client *c) {
676     addReplyLongLong(c,dictSize(c->db->dict));
677 }
678 
lastsaveCommand(client * c)679 void lastsaveCommand(client *c) {
680     addReplyLongLong(c,server.lastsave);
681 }
682 
typeCommand(client * c)683 void typeCommand(client *c) {
684     robj *o;
685     char *type;
686 
687     o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
688     if (o == NULL) {
689         type = "none";
690     } else {
691         switch(o->type) {
692         case OBJ_STRING: type = "string"; break;
693         case OBJ_LIST: type = "list"; break;
694         case OBJ_SET: type = "set"; break;
695         case OBJ_ZSET: type = "zset"; break;
696         case OBJ_HASH: type = "hash"; break;
697         default: type = "unknown"; break;
698         }
699     }
700     addReplyStatus(c,type);
701 }
702 
shutdownCommand(client * c)703 void shutdownCommand(client *c) {
704     int flags = 0;
705 
706     if (c->argc > 2) {
707         addReply(c,shared.syntaxerr);
708         return;
709     } else if (c->argc == 2) {
710         if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
711             flags |= SHUTDOWN_NOSAVE;
712         } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
713             flags |= SHUTDOWN_SAVE;
714         } else {
715             addReply(c,shared.syntaxerr);
716             return;
717         }
718     }
719     /* When SHUTDOWN is called while the server is loading a dataset in
720      * memory we need to make sure no attempt is performed to save
721      * the dataset on shutdown (otherwise it could overwrite the current DB
722      * with half-read data).
723      *
724      * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
725     if (server.loading || server.sentinel_mode)
726         flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
727     if (prepareForShutdown(flags) == C_OK) exit(0);
728     addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
729 }
730 
renameGenericCommand(client * c,int nx)731 void renameGenericCommand(client *c, int nx) {
732     robj *o;
733     long long expire;
734     int samekey = 0;
735 
736     /* When source and dest key is the same, no operation is performed,
737      * if the key exists, however we still return an error on unexisting key. */
738     if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
739 
740     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
741         return;
742 
743     if (samekey) {
744         addReply(c,nx ? shared.czero : shared.ok);
745         return;
746     }
747 
748     incrRefCount(o);
749     expire = getExpire(c->db,c->argv[1]);
750     if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
751         if (nx) {
752             decrRefCount(o);
753             addReply(c,shared.czero);
754             return;
755         }
756         /* Overwrite: delete the old key before creating the new one
757          * with the same name. */
758         dbDelete(c->db,c->argv[2]);
759     }
760     dbAdd(c->db,c->argv[2],o);
761     if (expire != -1) setExpire(c->db,c->argv[2],expire);
762     dbDelete(c->db,c->argv[1]);
763     signalModifiedKey(c->db,c->argv[1]);
764     signalModifiedKey(c->db,c->argv[2]);
765     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
766         c->argv[1],c->db->id);
767     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
768         c->argv[2],c->db->id);
769     server.dirty++;
770     addReply(c,nx ? shared.cone : shared.ok);
771 }
772 
renameCommand(client * c)773 void renameCommand(client *c) {
774     renameGenericCommand(c,0);
775 }
776 
renamenxCommand(client * c)777 void renamenxCommand(client *c) {
778     renameGenericCommand(c,1);
779 }
780 
moveCommand(client * c)781 void moveCommand(client *c) {
782     robj *o;
783     redisDb *src, *dst;
784     int srcid;
785     long long dbid, expire;
786 
787     if (server.cluster_enabled) {
788         addReplyError(c,"MOVE is not allowed in cluster mode");
789         return;
790     }
791 
792     /* Obtain source and target DB pointers */
793     src = c->db;
794     srcid = c->db->id;
795 
796     if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
797         dbid < INT_MIN || dbid > INT_MAX ||
798         selectDb(c,dbid) == C_ERR)
799     {
800         addReply(c,shared.outofrangeerr);
801         return;
802     }
803     dst = c->db;
804     selectDb(c,srcid); /* Back to the source DB */
805 
806     /* If the user is moving using as target the same
807      * DB as the source DB it is probably an error. */
808     if (src == dst) {
809         addReply(c,shared.sameobjecterr);
810         return;
811     }
812 
813     /* Check if the element exists and get a reference */
814     o = lookupKeyWrite(c->db,c->argv[1]);
815     if (!o) {
816         addReply(c,shared.czero);
817         return;
818     }
819     expire = getExpire(c->db,c->argv[1]);
820 
821     /* Return zero if the key already exists in the target DB */
822     if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
823         addReply(c,shared.czero);
824         return;
825     }
826     dbAdd(dst,c->argv[1],o);
827     if (expire != -1) setExpire(dst,c->argv[1],expire);
828     incrRefCount(o);
829 
830     /* OK! key moved, free the entry in the source DB */
831     dbDelete(src,c->argv[1]);
832     server.dirty++;
833     addReply(c,shared.cone);
834 }
835 
836 /*-----------------------------------------------------------------------------
837  * Expires API
838  *----------------------------------------------------------------------------*/
839 
removeExpire(redisDb * db,robj * key)840 int removeExpire(redisDb *db, robj *key) {
841     /* An expire may only be removed if there is a corresponding entry in the
842      * main dict. Otherwise, the key will never be freed. */
843     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
844     return dictDelete(db->expires,key->ptr) == DICT_OK;
845 }
846 
setExpire(redisDb * db,robj * key,long long when)847 void setExpire(redisDb *db, robj *key, long long when) {
848     dictEntry *kde, *de;
849 
850     /* Reuse the sds from the main dict in the expire dict */
851     kde = dictFind(db->dict,key->ptr);
852     serverAssertWithInfo(NULL,key,kde != NULL);
853     de = dictReplaceRaw(db->expires,dictGetKey(kde));
854     dictSetSignedIntegerVal(de,when);
855 }
856 
857 /* Return the expire time of the specified key, or -1 if no expire
858  * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)859 long long getExpire(redisDb *db, robj *key) {
860     dictEntry *de;
861 
862     /* No expire? return ASAP */
863     if (dictSize(db->expires) == 0 ||
864        (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
865 
866     /* The entry was found in the expire dict, this means it should also
867      * be present in the main dict (safety check). */
868     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
869     return dictGetSignedIntegerVal(de);
870 }
871 
872 /* Propagate expires into slaves and the AOF file.
873  * When a key expires in the master, a DEL operation for this key is sent
874  * to all the slaves and the AOF file if enabled.
875  *
876  * This way the key expiry is centralized in one place, and since both
877  * AOF and the master->slave link guarantee operation ordering, everything
878  * will be consistent even if we allow write operations against expiring
879  * keys. */
propagateExpire(redisDb * db,robj * key)880 void propagateExpire(redisDb *db, robj *key) {
881     robj *argv[2];
882 
883     argv[0] = shared.del;
884     argv[1] = key;
885     incrRefCount(argv[0]);
886     incrRefCount(argv[1]);
887 
888     if (server.aof_state != AOF_OFF)
889         feedAppendOnlyFile(server.delCommand,db->id,argv,2);
890     replicationFeedSlaves(server.slaves,db->id,argv,2);
891 
892     decrRefCount(argv[0]);
893     decrRefCount(argv[1]);
894 }
895 
expireIfNeeded(redisDb * db,robj * key)896 int expireIfNeeded(redisDb *db, robj *key) {
897     mstime_t when = getExpire(db,key);
898     mstime_t now;
899 
900     if (when < 0) return 0; /* No expire for this key */
901 
902     /* Don't expire anything while loading. It will be done later. */
903     if (server.loading) return 0;
904 
905     /* If we are in the context of a Lua script, we claim that time is
906      * blocked to when the Lua script started. This way a key can expire
907      * only the first time it is accessed and not in the middle of the
908      * script execution, making propagation to slaves / AOF consistent.
909      * See issue #1525 on Github for more information. */
910     now = server.lua_caller ? server.lua_time_start : mstime();
911 
912     /* If we are running in the context of a slave, return ASAP:
913      * the slave key expiration is controlled by the master that will
914      * send us synthesized DEL operations for expired keys.
915      *
916      * Still we try to return the right information to the caller,
917      * that is, 0 if we think the key should be still valid, 1 if
918      * we think the key is expired at this time. */
919     if (server.masterhost != NULL) return now > when;
920 
921     /* Return when this key has not expired */
922     if (now <= when) return 0;
923 
924     /* Delete the key */
925     server.stat_expiredkeys++;
926     propagateExpire(db,key);
927     notifyKeyspaceEvent(NOTIFY_EXPIRED,
928         "expired",key,db->id);
929     return dbDelete(db,key);
930 }
931 
932 /*-----------------------------------------------------------------------------
933  * Expires Commands
934  *----------------------------------------------------------------------------*/
935 
936 /* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
937  * and PEXPIREAT. Because the commad second argument may be relative or absolute
938  * the "basetime" argument is used to signal what the base time is (either 0
939  * for *AT variants of the command, or the current time for relative expires).
940  *
941  * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
942  * the argv[2] parameter. The basetime is always specified in milliseconds. */
expireGenericCommand(client * c,long long basetime,int unit)943 void expireGenericCommand(client *c, long long basetime, int unit) {
944     robj *key = c->argv[1], *param = c->argv[2];
945     long long when; /* unix time in milliseconds when the key will expire. */
946 
947     if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
948         return;
949 
950     if (unit == UNIT_SECONDS) when *= 1000;
951     when += basetime;
952 
953     /* No key, return zero. */
954     if (lookupKeyWrite(c->db,key) == NULL) {
955         addReply(c,shared.czero);
956         return;
957     }
958 
959     /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
960      * should never be executed as a DEL when load the AOF or in the context
961      * of a slave instance.
962      *
963      * Instead we take the other branch of the IF statement setting an expire
964      * (possibly in the past) and wait for an explicit DEL from the master. */
965     if (when <= mstime() && !server.loading && !server.masterhost) {
966         robj *aux;
967 
968         serverAssertWithInfo(c,key,dbDelete(c->db,key));
969         server.dirty++;
970 
971         /* Replicate/AOF this as an explicit DEL. */
972         aux = createStringObject("DEL",3);
973         rewriteClientCommandVector(c,2,aux,key);
974         decrRefCount(aux);
975         signalModifiedKey(c->db,key);
976         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
977         addReply(c, shared.cone);
978         return;
979     } else {
980         setExpire(c->db,key,when);
981         addReply(c,shared.cone);
982         signalModifiedKey(c->db,key);
983         notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
984         server.dirty++;
985         return;
986     }
987 }
988 
expireCommand(client * c)989 void expireCommand(client *c) {
990     expireGenericCommand(c,mstime(),UNIT_SECONDS);
991 }
992 
expireatCommand(client * c)993 void expireatCommand(client *c) {
994     expireGenericCommand(c,0,UNIT_SECONDS);
995 }
996 
pexpireCommand(client * c)997 void pexpireCommand(client *c) {
998     expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
999 }
1000 
pexpireatCommand(client * c)1001 void pexpireatCommand(client *c) {
1002     expireGenericCommand(c,0,UNIT_MILLISECONDS);
1003 }
1004 
ttlGenericCommand(client * c,int output_ms)1005 void ttlGenericCommand(client *c, int output_ms) {
1006     long long expire, ttl = -1;
1007 
1008     /* If the key does not exist at all, return -2 */
1009     if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) {
1010         addReplyLongLong(c,-2);
1011         return;
1012     }
1013     /* The key exists. Return -1 if it has no expire, or the actual
1014      * TTL value otherwise. */
1015     expire = getExpire(c->db,c->argv[1]);
1016     if (expire != -1) {
1017         ttl = expire-mstime();
1018         if (ttl < 0) ttl = 0;
1019     }
1020     if (ttl == -1) {
1021         addReplyLongLong(c,-1);
1022     } else {
1023         addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
1024     }
1025 }
1026 
ttlCommand(client * c)1027 void ttlCommand(client *c) {
1028     ttlGenericCommand(c, 0);
1029 }
1030 
pttlCommand(client * c)1031 void pttlCommand(client *c) {
1032     ttlGenericCommand(c, 1);
1033 }
1034 
persistCommand(client * c)1035 void persistCommand(client *c) {
1036     dictEntry *de;
1037 
1038     de = dictFind(c->db->dict,c->argv[1]->ptr);
1039     if (de == NULL) {
1040         addReply(c,shared.czero);
1041     } else {
1042         if (removeExpire(c->db,c->argv[1])) {
1043             addReply(c,shared.cone);
1044             server.dirty++;
1045         } else {
1046             addReply(c,shared.czero);
1047         }
1048     }
1049 }
1050 
1051 /* TOUCH key1 [key2 key3 ... keyN] */
touchCommand(client * c)1052 void touchCommand(client *c) {
1053     int touched = 0;
1054     for (int j = 1; j < c->argc; j++)
1055         if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++;
1056     addReplyLongLong(c,touched);
1057 }
1058 
1059 /* -----------------------------------------------------------------------------
1060  * API to get key arguments from commands
1061  * ---------------------------------------------------------------------------*/
1062 
1063 /* The base case is to use the keys position as given in the command table
1064  * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1065 int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
1066     int j, i = 0, last, *keys;
1067     UNUSED(argv);
1068 
1069     if (cmd->firstkey == 0) {
1070         *numkeys = 0;
1071         return NULL;
1072     }
1073     last = cmd->lastkey;
1074     if (last < 0) last = argc+last;
1075     keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
1076     for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1077         serverAssert(j < argc);
1078         keys[i++] = j;
1079     }
1080     *numkeys = i;
1081     return keys;
1082 }
1083 
1084 /* Return all the arguments that are keys in the command passed via argc / argv.
1085  *
1086  * The command returns the positions of all the key arguments inside the array,
1087  * so the actual return value is an heap allocated array of integers. The
1088  * length of the array is returned by reference into *numkeys.
1089  *
1090  * 'cmd' must be point to the corresponding entry into the redisCommand
1091  * table, according to the command name in argv[0].
1092  *
1093  * This function uses the command table if a command-specific helper function
1094  * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1095 int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1096     if (cmd->getkeys_proc) {
1097         return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1098     } else {
1099         return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
1100     }
1101 }
1102 
1103 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(int * result)1104 void getKeysFreeResult(int *result) {
1105     zfree(result);
1106 }
1107 
1108 /* Helper function to extract keys from following commands:
1109  * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1110  * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1111 int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1112     int i, num, *keys;
1113     UNUSED(cmd);
1114 
1115     num = atoi(argv[2]->ptr);
1116     /* Sanity check. Don't return any key if the command is going to
1117      * reply with syntax error. */
1118     if (num > (argc-3)) {
1119         *numkeys = 0;
1120         return NULL;
1121     }
1122 
1123     /* Keys in z{union,inter}store come from two places:
1124      * argv[1] = storage key,
1125      * argv[3...n] = keys to intersect */
1126     keys = zmalloc(sizeof(int)*(num+1));
1127 
1128     /* Add all key positions for argv[3...n] to keys[] */
1129     for (i = 0; i < num; i++) keys[i] = 3+i;
1130 
1131     /* Finally add the argv[1] key position (the storage key target). */
1132     keys[num] = 1;
1133     *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
1134     return keys;
1135 }
1136 
1137 /* Helper function to extract keys from the following commands:
1138  * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1139  * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1140 int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1141     int i, num, *keys;
1142     UNUSED(cmd);
1143 
1144     num = atoi(argv[2]->ptr);
1145     /* Sanity check. Don't return any key if the command is going to
1146      * reply with syntax error. */
1147     if (num > (argc-3)) {
1148         *numkeys = 0;
1149         return NULL;
1150     }
1151 
1152     keys = zmalloc(sizeof(int)*num);
1153     *numkeys = num;
1154 
1155     /* Add all key positions for argv[3...n] to keys[] */
1156     for (i = 0; i < num; i++) keys[i] = 3+i;
1157 
1158     return keys;
1159 }
1160 
1161 /* Helper function to extract keys from the SORT command.
1162  *
1163  * SORT <sort-key> ... STORE <store-key> ...
1164  *
1165  * The first argument of SORT is always a key, however a list of options
1166  * follow in SQL-alike style. Here we parse just the minimum in order to
1167  * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1168 int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1169     int i, j, num, *keys, found_store = 0;
1170     UNUSED(cmd);
1171 
1172     num = 0;
1173     keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
1174 
1175     keys[num++] = 1; /* <sort-key> is always present. */
1176 
1177     /* Search for STORE option. By default we consider options to don't
1178      * have arguments, so if we find an unknown option name we scan the
1179      * next. However there are options with 1 or 2 arguments, so we
1180      * provide a list here in order to skip the right number of args. */
1181     struct {
1182         char *name;
1183         int skip;
1184     } skiplist[] = {
1185         {"limit", 2},
1186         {"get", 1},
1187         {"by", 1},
1188         {NULL, 0} /* End of elements. */
1189     };
1190 
1191     for (i = 2; i < argc; i++) {
1192         for (j = 0; skiplist[j].name != NULL; j++) {
1193             if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1194                 i += skiplist[j].skip;
1195                 break;
1196             } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1197                 /* Note: we don't increment "num" here and continue the loop
1198                  * to be sure to process the *last* "STORE" option if multiple
1199                  * ones are provided. This is same behavior as SORT. */
1200                 found_store = 1;
1201                 keys[num] = i+1; /* <store-key> */
1202                 break;
1203             }
1204         }
1205     }
1206     *numkeys = num + found_store;
1207     return keys;
1208 }
1209 
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1210 int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1211     int i, num, first, *keys;
1212     UNUSED(cmd);
1213 
1214     /* Assume the obvious form. */
1215     first = 3;
1216     num = 1;
1217 
1218     /* But check for the extended one with the KEYS option. */
1219     if (argc > 6) {
1220         for (i = 6; i < argc; i++) {
1221             if (!strcasecmp(argv[i]->ptr,"keys") &&
1222                 sdslen(argv[3]->ptr) == 0)
1223             {
1224                 first = i+1;
1225                 num = argc-first;
1226                 break;
1227             }
1228         }
1229     }
1230 
1231     keys = zmalloc(sizeof(int)*num);
1232     for (i = 0; i < num; i++) keys[i] = first+i;
1233     *numkeys = num;
1234     return keys;
1235 }
1236 
1237 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1238  * a fast way a key that belongs to a specified hash slot. This is useful
1239  * while rehashing the cluster. */
slotToKeyAdd(robj * key)1240 void slotToKeyAdd(robj *key) {
1241     unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1242 
1243     zslInsert(server.cluster->slots_to_keys,hashslot,key);
1244     incrRefCount(key);
1245 }
1246 
slotToKeyDel(robj * key)1247 void slotToKeyDel(robj *key) {
1248     unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1249 
1250     zslDelete(server.cluster->slots_to_keys,hashslot,key);
1251 }
1252 
slotToKeyFlush(void)1253 void slotToKeyFlush(void) {
1254     zslFree(server.cluster->slots_to_keys);
1255     server.cluster->slots_to_keys = zslCreate();
1256 }
1257 
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1258 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1259     zskiplistNode *n;
1260     zrangespec range;
1261     int j = 0;
1262 
1263     range.min = range.max = hashslot;
1264     range.minex = range.maxex = 0;
1265 
1266     n = zslFirstInRange(server.cluster->slots_to_keys, &range);
1267     while(n && n->score == hashslot && count--) {
1268         keys[j++] = n->obj;
1269         n = n->level[0].forward;
1270     }
1271     return j;
1272 }
1273 
1274 /* Remove all the keys in the specified hash slot.
1275  * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1276 unsigned int delKeysInSlot(unsigned int hashslot) {
1277     zskiplistNode *n;
1278     zrangespec range;
1279     int j = 0;
1280 
1281     range.min = range.max = hashslot;
1282     range.minex = range.maxex = 0;
1283 
1284     n = zslFirstInRange(server.cluster->slots_to_keys, &range);
1285     while(n && n->score == hashslot) {
1286         robj *key = n->obj;
1287         n = n->level[0].forward; /* Go to the next item before freeing it. */
1288         incrRefCount(key); /* Protect the object while freeing it. */
1289         dbDelete(&server.db[0],key);
1290         decrRefCount(key);
1291         j++;
1292     }
1293     return j;
1294 }
1295 
countKeysInSlot(unsigned int hashslot)1296 unsigned int countKeysInSlot(unsigned int hashslot) {
1297     zskiplist *zsl = server.cluster->slots_to_keys;
1298     zskiplistNode *zn;
1299     zrangespec range;
1300     int rank, count = 0;
1301 
1302     range.min = range.max = hashslot;
1303     range.minex = range.maxex = 0;
1304 
1305     /* Find first element in range */
1306     zn = zslFirstInRange(zsl, &range);
1307 
1308     /* Use rank of first element, if any, to determine preliminary count */
1309     if (zn != NULL) {
1310         rank = zslGetRank(zsl, zn->score, zn->obj);
1311         count = (zsl->length - (rank - 1));
1312 
1313         /* Find last element in range */
1314         zn = zslLastInRange(zsl, &range);
1315 
1316         /* Use rank of last element, if any, to determine the actual count */
1317         if (zn != NULL) {
1318             rank = zslGetRank(zsl, zn->score, zn->obj);
1319             count -= (zsl->length - rank);
1320         }
1321     }
1322     return count;
1323 }
1324