xref: /f-stack/app/redis-5.0.5/src/db.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "atomicvar.h"
33 
34 #include <signal.h>
35 #include <ctype.h>
36 
37 /*-----------------------------------------------------------------------------
38  * C-level DB API
39  *----------------------------------------------------------------------------*/
40 
41 int keyIsExpired(redisDb *db, robj *key);
42 
43 /* Update LFU when an object is accessed.
44  * Firstly, decrement the counter if the decrement time is reached.
45  * Then logarithmically increment the counter, and update the access time. */
updateLFU(robj * val)46 void updateLFU(robj *val) {
47     unsigned long counter = LFUDecrAndReturn(val);
48     counter = LFULogIncr(counter);
49     val->lru = (LFUGetTimeInMinutes()<<8) | counter;
50 }
51 
52 /* Low level key lookup API, not actually called directly from commands
53  * implementations that should instead rely on lookupKeyRead(),
54  * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)55 robj *lookupKey(redisDb *db, robj *key, int flags) {
56     dictEntry *de = dictFind(db->dict,key->ptr);
57     if (de) {
58         robj *val = dictGetVal(de);
59 
60         /* Update the access time for the ageing algorithm.
61          * Don't do it if we have a saving child, as this will trigger
62          * a copy on write madness. */
63         if (server.rdb_child_pid == -1 &&
64             server.aof_child_pid == -1 &&
65             !(flags & LOOKUP_NOTOUCH))
66         {
67             if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
68                 updateLFU(val);
69             } else {
70                 val->lru = LRU_CLOCK();
71             }
72         }
73         return val;
74     } else {
75         return NULL;
76     }
77 }
78 
79 /* Lookup a key for read operations, or return NULL if the key is not found
80  * in the specified DB.
81  *
82  * As a side effect of calling this function:
83  * 1. A key gets expired if it reached it's TTL.
84  * 2. The key last access time is updated.
85  * 3. The global keys hits/misses stats are updated (reported in INFO).
86  *
87  * This API should not be used when we write to the key after obtaining
88  * the object linked to the key, but only for read only operations.
89  *
90  * Flags change the behavior of this command:
91  *
92  *  LOOKUP_NONE (or zero): no special flags are passed.
93  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
94  *
95  * Note: this function also returns NULL if the key is logically expired
96  * but still existing, in case this is a slave, since this API is called only
97  * for read operations. Even if the key expiry is master-driven, we can
98  * correctly report a key is expired on slaves even if the master is lagging
99  * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)100 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
101     robj *val;
102 
103     if (expireIfNeeded(db,key) == 1) {
104         /* Key expired. If we are in the context of a master, expireIfNeeded()
105          * returns 0 only when the key does not exist at all, so it's safe
106          * to return NULL ASAP. */
107         if (server.masterhost == NULL) {
108             server.stat_keyspace_misses++;
109             return NULL;
110         }
111 
112         /* However if we are in the context of a slave, expireIfNeeded() will
113          * not really try to expire the key, it only returns information
114          * about the "logical" status of the key: key expiring is up to the
115          * master in order to have a consistent view of master's data set.
116          *
117          * However, if the command caller is not the master, and as additional
118          * safety measure, the command invoked is a read-only command, we can
119          * safely return NULL here, and provide a more consistent behavior
120          * to clients accessign expired values in a read-only fashion, that
121          * will say the key as non existing.
122          *
123          * Notably this covers GETs when slaves are used to scale reads. */
124         if (server.current_client &&
125             server.current_client != server.master &&
126             server.current_client->cmd &&
127             server.current_client->cmd->flags & CMD_READONLY)
128         {
129             server.stat_keyspace_misses++;
130             return NULL;
131         }
132     }
133     val = lookupKey(db,key,flags);
134     if (val == NULL)
135         server.stat_keyspace_misses++;
136     else
137         server.stat_keyspace_hits++;
138     return val;
139 }
140 
141 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
142  * common case. */
lookupKeyRead(redisDb * db,robj * key)143 robj *lookupKeyRead(redisDb *db, robj *key) {
144     return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
145 }
146 
147 /* Lookup a key for write operations, and as a side effect, if needed, expires
148  * the key if its TTL is reached.
149  *
150  * Returns the linked value object if the key exists or NULL if the key
151  * does not exist in the specified DB. */
lookupKeyWrite(redisDb * db,robj * key)152 robj *lookupKeyWrite(redisDb *db, robj *key) {
153     expireIfNeeded(db,key);
154     return lookupKey(db,key,LOOKUP_NONE);
155 }
156 
lookupKeyReadOrReply(client * c,robj * key,robj * reply)157 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
158     robj *o = lookupKeyRead(c->db, key);
159     if (!o) addReply(c,reply);
160     return o;
161 }
162 
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)163 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
164     robj *o = lookupKeyWrite(c->db, key);
165     if (!o) addReply(c,reply);
166     return o;
167 }
168 
169 /* Add the key to the DB. It's up to the caller to increment the reference
170  * counter of the value if needed.
171  *
172  * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)173 void dbAdd(redisDb *db, robj *key, robj *val) {
174     sds copy = sdsdup(key->ptr);
175     int retval = dictAdd(db->dict, copy, val);
176 
177     serverAssertWithInfo(NULL,key,retval == DICT_OK);
178     if (val->type == OBJ_LIST ||
179         val->type == OBJ_ZSET)
180         signalKeyAsReady(db, key);
181     if (server.cluster_enabled) slotToKeyAdd(key);
182 }
183 
184 /* Overwrite an existing key with a new value. Incrementing the reference
185  * count of the new value is up to the caller.
186  * This function does not modify the expire time of the existing key.
187  *
188  * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)189 void dbOverwrite(redisDb *db, robj *key, robj *val) {
190     dictEntry *de = dictFind(db->dict,key->ptr);
191 
192     serverAssertWithInfo(NULL,key,de != NULL);
193     dictEntry auxentry = *de;
194     robj *old = dictGetVal(de);
195     if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
196         val->lru = old->lru;
197     }
198     dictSetVal(db->dict, de, val);
199 
200     if (server.lazyfree_lazy_server_del) {
201         freeObjAsync(old);
202         dictSetVal(db->dict, &auxentry, NULL);
203     }
204 
205     dictFreeVal(db->dict, &auxentry);
206 }
207 
208 /* High level Set operation. This function can be used in order to set
209  * a key, whatever it was existing or not, to a new object.
210  *
211  * 1) The ref count of the value object is incremented.
212  * 2) clients WATCHing for the destination key notified.
213  * 3) The expire time of the key is reset (the key is made persistent).
214  *
215  * All the new keys in the database should be created via this interface. */
setKey(redisDb * db,robj * key,robj * val)216 void setKey(redisDb *db, robj *key, robj *val) {
217     if (lookupKeyWrite(db,key) == NULL) {
218         dbAdd(db,key,val);
219     } else {
220         dbOverwrite(db,key,val);
221     }
222     incrRefCount(val);
223     removeExpire(db,key);
224     signalModifiedKey(db,key);
225 }
226 
dbExists(redisDb * db,robj * key)227 int dbExists(redisDb *db, robj *key) {
228     return dictFind(db->dict,key->ptr) != NULL;
229 }
230 
231 /* Return a random key, in form of a Redis object.
232  * If there are no keys, NULL is returned.
233  *
234  * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)235 robj *dbRandomKey(redisDb *db) {
236     dictEntry *de;
237     int maxtries = 100;
238     int allvolatile = dictSize(db->dict) == dictSize(db->expires);
239 
240     while(1) {
241         sds key;
242         robj *keyobj;
243 
244         de = dictGetRandomKey(db->dict);
245         if (de == NULL) return NULL;
246 
247         key = dictGetKey(de);
248         keyobj = createStringObject(key,sdslen(key));
249         if (dictFind(db->expires,key)) {
250             if (allvolatile && server.masterhost && --maxtries == 0) {
251                 /* If the DB is composed only of keys with an expire set,
252                  * it could happen that all the keys are already logically
253                  * expired in the slave, so the function cannot stop because
254                  * expireIfNeeded() is false, nor it can stop because
255                  * dictGetRandomKey() returns NULL (there are keys to return).
256                  * To prevent the infinite loop we do some tries, but if there
257                  * are the conditions for an infinite loop, eventually we
258                  * return a key name that may be already expired. */
259                 return keyobj;
260             }
261             if (expireIfNeeded(db,keyobj)) {
262                 decrRefCount(keyobj);
263                 continue; /* search for another key. This expired. */
264             }
265         }
266         return keyobj;
267     }
268 }
269 
270 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbSyncDelete(redisDb * db,robj * key)271 int dbSyncDelete(redisDb *db, robj *key) {
272     /* Deleting an entry from the expires dict will not free the sds of
273      * the key, because it is shared with the main dictionary. */
274     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
275     if (dictDelete(db->dict,key->ptr) == DICT_OK) {
276         if (server.cluster_enabled) slotToKeyDel(key);
277         return 1;
278     } else {
279         return 0;
280     }
281 }
282 
283 /* This is a wrapper whose behavior depends on the Redis lazy free
284  * configuration. Deletes the key synchronously or asynchronously. */
dbDelete(redisDb * db,robj * key)285 int dbDelete(redisDb *db, robj *key) {
286     return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
287                                              dbSyncDelete(db,key);
288 }
289 
290 /* Prepare the string object stored at 'key' to be modified destructively
291  * to implement commands like SETBIT or APPEND.
292  *
293  * An object is usually ready to be modified unless one of the two conditions
294  * are true:
295  *
296  * 1) The object 'o' is shared (refcount > 1), we don't want to affect
297  *    other users.
298  * 2) The object encoding is not "RAW".
299  *
300  * If the object is found in one of the above conditions (or both) by the
301  * function, an unshared / not-encoded copy of the string object is stored
302  * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
303  * returned.
304  *
305  * USAGE:
306  *
307  * The object 'o' is what the caller already obtained by looking up 'key'
308  * in 'db', the usage pattern looks like this:
309  *
310  * o = lookupKeyWrite(db,key);
311  * if (checkType(c,o,OBJ_STRING)) return;
312  * o = dbUnshareStringValue(db,key,o);
313  *
314  * At this point the caller is ready to modify the object, for example
315  * using an sdscat() call to append some data, or anything else.
316  */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)317 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
318     serverAssert(o->type == OBJ_STRING);
319     if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
320         robj *decoded = getDecodedObject(o);
321         o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
322         decrRefCount(decoded);
323         dbOverwrite(db,key,o);
324     }
325     return o;
326 }
327 
328 /* Remove all keys from all the databases in a Redis server.
329  * If callback is given the function is called from time to time to
330  * signal that work is in progress.
331  *
332  * The dbnum can be -1 if all the DBs should be flushed, or the specified
333  * DB number if we want to flush only a single Redis database number.
334  *
335  * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
336  * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
337  * and the function to return ASAP.
338  *
339  * On success the fuction returns the number of keys removed from the
340  * database(s). Otherwise -1 is returned in the specific case the
341  * DB number is out of range, and errno is set to EINVAL. */
emptyDb(int dbnum,int flags,void (callback)(void *))342 long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
343     int async = (flags & EMPTYDB_ASYNC);
344     long long removed = 0;
345 
346     if (dbnum < -1 || dbnum >= server.dbnum) {
347         errno = EINVAL;
348         return -1;
349     }
350 
351     int startdb, enddb;
352     if (dbnum == -1) {
353         startdb = 0;
354         enddb = server.dbnum-1;
355     } else {
356         startdb = enddb = dbnum;
357     }
358 
359     for (int j = startdb; j <= enddb; j++) {
360         removed += dictSize(server.db[j].dict);
361         if (async) {
362             emptyDbAsync(&server.db[j]);
363         } else {
364             dictEmpty(server.db[j].dict,callback);
365             dictEmpty(server.db[j].expires,callback);
366         }
367     }
368     if (server.cluster_enabled) {
369         if (async) {
370             slotToKeyFlushAsync();
371         } else {
372             slotToKeyFlush();
373         }
374     }
375     if (dbnum == -1) flushSlaveKeysWithExpireList();
376     return removed;
377 }
378 
selectDb(client * c,int id)379 int selectDb(client *c, int id) {
380     if (id < 0 || id >= server.dbnum)
381         return C_ERR;
382     c->db = &server.db[id];
383     return C_OK;
384 }
385 
386 /*-----------------------------------------------------------------------------
387  * Hooks for key space changes.
388  *
389  * Every time a key in the database is modified the function
390  * signalModifiedKey() is called.
391  *
392  * Every time a DB is flushed the function signalFlushDb() is called.
393  *----------------------------------------------------------------------------*/
394 
signalModifiedKey(redisDb * db,robj * key)395 void signalModifiedKey(redisDb *db, robj *key) {
396     touchWatchedKey(db,key);
397 }
398 
signalFlushedDb(int dbid)399 void signalFlushedDb(int dbid) {
400     touchWatchedKeysOnFlush(dbid);
401 }
402 
403 /*-----------------------------------------------------------------------------
404  * Type agnostic commands operating on the key space
405  *----------------------------------------------------------------------------*/
406 
407 /* Return the set of flags to use for the emptyDb() call for FLUSHALL
408  * and FLUSHDB commands.
409  *
410  * Currently the command just attempts to parse the "ASYNC" option. It
411  * also checks if the command arity is wrong.
412  *
413  * On success C_OK is returned and the flags are stored in *flags, otherwise
414  * C_ERR is returned and the function sends an error to the client. */
getFlushCommandFlags(client * c,int * flags)415 int getFlushCommandFlags(client *c, int *flags) {
416     /* Parse the optional ASYNC option. */
417     if (c->argc > 1) {
418         if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
419             addReply(c,shared.syntaxerr);
420             return C_ERR;
421         }
422         *flags = EMPTYDB_ASYNC;
423     } else {
424         *flags = EMPTYDB_NO_FLAGS;
425     }
426     return C_OK;
427 }
428 
429 /* FLUSHDB [ASYNC]
430  *
431  * Flushes the currently SELECTed Redis DB. */
flushdbCommand(client * c)432 void flushdbCommand(client *c) {
433     int flags;
434 
435     if (getFlushCommandFlags(c,&flags) == C_ERR) return;
436     signalFlushedDb(c->db->id);
437     server.dirty += emptyDb(c->db->id,flags,NULL);
438     addReply(c,shared.ok);
439 }
440 
441 /* FLUSHALL [ASYNC]
442  *
443  * Flushes the whole server data set. */
flushallCommand(client * c)444 void flushallCommand(client *c) {
445     int flags;
446 
447     if (getFlushCommandFlags(c,&flags) == C_ERR) return;
448     signalFlushedDb(-1);
449     server.dirty += emptyDb(-1,flags,NULL);
450     addReply(c,shared.ok);
451     if (server.rdb_child_pid != -1) {
452         kill(server.rdb_child_pid,SIGUSR1);
453         rdbRemoveTempFile(server.rdb_child_pid);
454     }
455     if (server.saveparamslen > 0) {
456         /* Normally rdbSave() will reset dirty, but we don't want this here
457          * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
458         int saved_dirty = server.dirty;
459         rdbSaveInfo rsi, *rsiptr;
460         rsiptr = rdbPopulateSaveInfo(&rsi);
461         rdbSave(server.rdb_filename,rsiptr);
462         server.dirty = saved_dirty;
463     }
464     server.dirty++;
465 }
466 
467 /* This command implements DEL and LAZYDEL. */
delGenericCommand(client * c,int lazy)468 void delGenericCommand(client *c, int lazy) {
469     int numdel = 0, j;
470 
471     for (j = 1; j < c->argc; j++) {
472         expireIfNeeded(c->db,c->argv[j]);
473         int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
474                               dbSyncDelete(c->db,c->argv[j]);
475         if (deleted) {
476             signalModifiedKey(c->db,c->argv[j]);
477             notifyKeyspaceEvent(NOTIFY_GENERIC,
478                 "del",c->argv[j],c->db->id);
479             server.dirty++;
480             numdel++;
481         }
482     }
483     addReplyLongLong(c,numdel);
484 }
485 
delCommand(client * c)486 void delCommand(client *c) {
487     delGenericCommand(c,0);
488 }
489 
unlinkCommand(client * c)490 void unlinkCommand(client *c) {
491     delGenericCommand(c,1);
492 }
493 
494 /* EXISTS key1 key2 ... key_N.
495  * Return value is the number of keys existing. */
existsCommand(client * c)496 void existsCommand(client *c) {
497     long long count = 0;
498     int j;
499 
500     for (j = 1; j < c->argc; j++) {
501         if (lookupKeyRead(c->db,c->argv[j])) count++;
502     }
503     addReplyLongLong(c,count);
504 }
505 
selectCommand(client * c)506 void selectCommand(client *c) {
507     long id;
508 
509     if (getLongFromObjectOrReply(c, c->argv[1], &id,
510         "invalid DB index") != C_OK)
511         return;
512 
513     if (server.cluster_enabled && id != 0) {
514         addReplyError(c,"SELECT is not allowed in cluster mode");
515         return;
516     }
517     if (selectDb(c,id) == C_ERR) {
518         addReplyError(c,"DB index is out of range");
519     } else {
520         addReply(c,shared.ok);
521     }
522 }
523 
randomkeyCommand(client * c)524 void randomkeyCommand(client *c) {
525     robj *key;
526 
527     if ((key = dbRandomKey(c->db)) == NULL) {
528         addReply(c,shared.nullbulk);
529         return;
530     }
531 
532     addReplyBulk(c,key);
533     decrRefCount(key);
534 }
535 
keysCommand(client * c)536 void keysCommand(client *c) {
537     dictIterator *di;
538     dictEntry *de;
539     sds pattern = c->argv[1]->ptr;
540     int plen = sdslen(pattern), allkeys;
541     unsigned long numkeys = 0;
542     void *replylen = addDeferredMultiBulkLength(c);
543 
544     di = dictGetSafeIterator(c->db->dict);
545     allkeys = (pattern[0] == '*' && pattern[1] == '\0');
546     while((de = dictNext(di)) != NULL) {
547         sds key = dictGetKey(de);
548         robj *keyobj;
549 
550         if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
551             keyobj = createStringObject(key,sdslen(key));
552             if (!keyIsExpired(c->db,keyobj)) {
553                 addReplyBulk(c,keyobj);
554                 numkeys++;
555             }
556             decrRefCount(keyobj);
557         }
558     }
559     dictReleaseIterator(di);
560     setDeferredMultiBulkLength(c,replylen,numkeys);
561 }
562 
563 /* This callback is used by scanGenericCommand in order to collect elements
564  * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)565 void scanCallback(void *privdata, const dictEntry *de) {
566     void **pd = (void**) privdata;
567     list *keys = pd[0];
568     robj *o = pd[1];
569     robj *key, *val = NULL;
570 
571     if (o == NULL) {
572         sds sdskey = dictGetKey(de);
573         key = createStringObject(sdskey, sdslen(sdskey));
574     } else if (o->type == OBJ_SET) {
575         sds keysds = dictGetKey(de);
576         key = createStringObject(keysds,sdslen(keysds));
577     } else if (o->type == OBJ_HASH) {
578         sds sdskey = dictGetKey(de);
579         sds sdsval = dictGetVal(de);
580         key = createStringObject(sdskey,sdslen(sdskey));
581         val = createStringObject(sdsval,sdslen(sdsval));
582     } else if (o->type == OBJ_ZSET) {
583         sds sdskey = dictGetKey(de);
584         key = createStringObject(sdskey,sdslen(sdskey));
585         val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
586     } else {
587         serverPanic("Type not handled in SCAN callback.");
588     }
589 
590     listAddNodeTail(keys, key);
591     if (val) listAddNodeTail(keys, val);
592 }
593 
594 /* Try to parse a SCAN cursor stored at object 'o':
595  * if the cursor is valid, store it as unsigned integer into *cursor and
596  * returns C_OK. Otherwise return C_ERR and send an error to the
597  * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)598 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
599     char *eptr;
600 
601     /* Use strtoul() because we need an *unsigned* long, so
602      * getLongLongFromObject() does not cover the whole cursor space. */
603     errno = 0;
604     *cursor = strtoul(o->ptr, &eptr, 10);
605     if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
606     {
607         addReplyError(c, "invalid cursor");
608         return C_ERR;
609     }
610     return C_OK;
611 }
612 
613 /* This command implements SCAN, HSCAN and SSCAN commands.
614  * If object 'o' is passed, then it must be a Hash or Set object, otherwise
615  * if 'o' is NULL the command will operate on the dictionary associated with
616  * the current database.
617  *
618  * When 'o' is not NULL the function assumes that the first argument in
619  * the client arguments vector is a key so it skips it before iterating
620  * in order to parse options.
621  *
622  * In the case of a Hash object the function returns both the field and value
623  * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)624 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
625     int i, j;
626     list *keys = listCreate();
627     listNode *node, *nextnode;
628     long count = 10;
629     sds pat = NULL;
630     int patlen = 0, use_pattern = 0;
631     dict *ht;
632 
633     /* Object must be NULL (to iterate keys names), or the type of the object
634      * must be Set, Sorted Set, or Hash. */
635     serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
636                 o->type == OBJ_ZSET);
637 
638     /* Set i to the first option argument. The previous one is the cursor. */
639     i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
640 
641     /* Step 1: Parse options. */
642     while (i < c->argc) {
643         j = c->argc - i;
644         if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
645             if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
646                 != C_OK)
647             {
648                 goto cleanup;
649             }
650 
651             if (count < 1) {
652                 addReply(c,shared.syntaxerr);
653                 goto cleanup;
654             }
655 
656             i += 2;
657         } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
658             pat = c->argv[i+1]->ptr;
659             patlen = sdslen(pat);
660 
661             /* The pattern always matches if it is exactly "*", so it is
662              * equivalent to disabling it. */
663             use_pattern = !(pat[0] == '*' && patlen == 1);
664 
665             i += 2;
666         } else {
667             addReply(c,shared.syntaxerr);
668             goto cleanup;
669         }
670     }
671 
672     /* Step 2: Iterate the collection.
673      *
674      * Note that if the object is encoded with a ziplist, intset, or any other
675      * representation that is not a hash table, we are sure that it is also
676      * composed of a small number of elements. So to avoid taking state we
677      * just return everything inside the object in a single call, setting the
678      * cursor to zero to signal the end of the iteration. */
679 
680     /* Handle the case of a hash table. */
681     ht = NULL;
682     if (o == NULL) {
683         ht = c->db->dict;
684     } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
685         ht = o->ptr;
686     } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
687         ht = o->ptr;
688         count *= 2; /* We return key / value for this type. */
689     } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
690         zset *zs = o->ptr;
691         ht = zs->dict;
692         count *= 2; /* We return key / value for this type. */
693     }
694 
695     if (ht) {
696         void *privdata[2];
697         /* We set the max number of iterations to ten times the specified
698          * COUNT, so if the hash table is in a pathological state (very
699          * sparsely populated) we avoid to block too much time at the cost
700          * of returning no or very few elements. */
701         long maxiterations = count*10;
702 
703         /* We pass two pointers to the callback: the list to which it will
704          * add new elements, and the object containing the dictionary so that
705          * it is possible to fetch more data in a type-dependent way. */
706         privdata[0] = keys;
707         privdata[1] = o;
708         do {
709             cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
710         } while (cursor &&
711               maxiterations-- &&
712               listLength(keys) < (unsigned long)count);
713     } else if (o->type == OBJ_SET) {
714         int pos = 0;
715         int64_t ll;
716 
717         while(intsetGet(o->ptr,pos++,&ll))
718             listAddNodeTail(keys,createStringObjectFromLongLong(ll));
719         cursor = 0;
720     } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
721         unsigned char *p = ziplistIndex(o->ptr,0);
722         unsigned char *vstr;
723         unsigned int vlen;
724         long long vll;
725 
726         while(p) {
727             ziplistGet(p,&vstr,&vlen,&vll);
728             listAddNodeTail(keys,
729                 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
730                                  createStringObjectFromLongLong(vll));
731             p = ziplistNext(o->ptr,p);
732         }
733         cursor = 0;
734     } else {
735         serverPanic("Not handled encoding in SCAN.");
736     }
737 
738     /* Step 3: Filter elements. */
739     node = listFirst(keys);
740     while (node) {
741         robj *kobj = listNodeValue(node);
742         nextnode = listNextNode(node);
743         int filter = 0;
744 
745         /* Filter element if it does not match the pattern. */
746         if (!filter && use_pattern) {
747             if (sdsEncodedObject(kobj)) {
748                 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
749                     filter = 1;
750             } else {
751                 char buf[LONG_STR_SIZE];
752                 int len;
753 
754                 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
755                 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
756                 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
757             }
758         }
759 
760         /* Filter element if it is an expired key. */
761         if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
762 
763         /* Remove the element and its associted value if needed. */
764         if (filter) {
765             decrRefCount(kobj);
766             listDelNode(keys, node);
767         }
768 
769         /* If this is a hash or a sorted set, we have a flat list of
770          * key-value elements, so if this element was filtered, remove the
771          * value, or skip it if it was not filtered: we only match keys. */
772         if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
773             node = nextnode;
774             nextnode = listNextNode(node);
775             if (filter) {
776                 kobj = listNodeValue(node);
777                 decrRefCount(kobj);
778                 listDelNode(keys, node);
779             }
780         }
781         node = nextnode;
782     }
783 
784     /* Step 4: Reply to the client. */
785     addReplyMultiBulkLen(c, 2);
786     addReplyBulkLongLong(c,cursor);
787 
788     addReplyMultiBulkLen(c, listLength(keys));
789     while ((node = listFirst(keys)) != NULL) {
790         robj *kobj = listNodeValue(node);
791         addReplyBulk(c, kobj);
792         decrRefCount(kobj);
793         listDelNode(keys, node);
794     }
795 
796 cleanup:
797     listSetFreeMethod(keys,decrRefCountVoid);
798     listRelease(keys);
799 }
800 
801 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)802 void scanCommand(client *c) {
803     unsigned long cursor;
804     if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
805     scanGenericCommand(c,NULL,cursor);
806 }
807 
dbsizeCommand(client * c)808 void dbsizeCommand(client *c) {
809     addReplyLongLong(c,dictSize(c->db->dict));
810 }
811 
lastsaveCommand(client * c)812 void lastsaveCommand(client *c) {
813     addReplyLongLong(c,server.lastsave);
814 }
815 
typeCommand(client * c)816 void typeCommand(client *c) {
817     robj *o;
818     char *type;
819 
820     o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
821     if (o == NULL) {
822         type = "none";
823     } else {
824         switch(o->type) {
825         case OBJ_STRING: type = "string"; break;
826         case OBJ_LIST: type = "list"; break;
827         case OBJ_SET: type = "set"; break;
828         case OBJ_ZSET: type = "zset"; break;
829         case OBJ_HASH: type = "hash"; break;
830         case OBJ_STREAM: type = "stream"; break;
831         case OBJ_MODULE: {
832             moduleValue *mv = o->ptr;
833             type = mv->type->name;
834         }; break;
835         default: type = "unknown"; break;
836         }
837     }
838     addReplyStatus(c,type);
839 }
840 
shutdownCommand(client * c)841 void shutdownCommand(client *c) {
842     int flags = 0;
843 
844     if (c->argc > 2) {
845         addReply(c,shared.syntaxerr);
846         return;
847     } else if (c->argc == 2) {
848         if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
849             flags |= SHUTDOWN_NOSAVE;
850         } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
851             flags |= SHUTDOWN_SAVE;
852         } else {
853             addReply(c,shared.syntaxerr);
854             return;
855         }
856     }
857     /* When SHUTDOWN is called while the server is loading a dataset in
858      * memory we need to make sure no attempt is performed to save
859      * the dataset on shutdown (otherwise it could overwrite the current DB
860      * with half-read data).
861      *
862      * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
863     if (server.loading || server.sentinel_mode)
864         flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
865     if (prepareForShutdown(flags) == C_OK) exit(0);
866     addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
867 }
868 
renameGenericCommand(client * c,int nx)869 void renameGenericCommand(client *c, int nx) {
870     robj *o;
871     long long expire;
872     int samekey = 0;
873 
874     /* When source and dest key is the same, no operation is performed,
875      * if the key exists, however we still return an error on unexisting key. */
876     if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
877 
878     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
879         return;
880 
881     if (samekey) {
882         addReply(c,nx ? shared.czero : shared.ok);
883         return;
884     }
885 
886     incrRefCount(o);
887     expire = getExpire(c->db,c->argv[1]);
888     if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
889         if (nx) {
890             decrRefCount(o);
891             addReply(c,shared.czero);
892             return;
893         }
894         /* Overwrite: delete the old key before creating the new one
895          * with the same name. */
896         dbDelete(c->db,c->argv[2]);
897     }
898     dbAdd(c->db,c->argv[2],o);
899     if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
900     dbDelete(c->db,c->argv[1]);
901     signalModifiedKey(c->db,c->argv[1]);
902     signalModifiedKey(c->db,c->argv[2]);
903     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
904         c->argv[1],c->db->id);
905     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
906         c->argv[2],c->db->id);
907     server.dirty++;
908     addReply(c,nx ? shared.cone : shared.ok);
909 }
910 
renameCommand(client * c)911 void renameCommand(client *c) {
912     renameGenericCommand(c,0);
913 }
914 
renamenxCommand(client * c)915 void renamenxCommand(client *c) {
916     renameGenericCommand(c,1);
917 }
918 
moveCommand(client * c)919 void moveCommand(client *c) {
920     robj *o;
921     redisDb *src, *dst;
922     int srcid;
923     long long dbid, expire;
924 
925     if (server.cluster_enabled) {
926         addReplyError(c,"MOVE is not allowed in cluster mode");
927         return;
928     }
929 
930     /* Obtain source and target DB pointers */
931     src = c->db;
932     srcid = c->db->id;
933 
934     if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
935         dbid < INT_MIN || dbid > INT_MAX ||
936         selectDb(c,dbid) == C_ERR)
937     {
938         addReply(c,shared.outofrangeerr);
939         return;
940     }
941     dst = c->db;
942     selectDb(c,srcid); /* Back to the source DB */
943 
944     /* If the user is moving using as target the same
945      * DB as the source DB it is probably an error. */
946     if (src == dst) {
947         addReply(c,shared.sameobjecterr);
948         return;
949     }
950 
951     /* Check if the element exists and get a reference */
952     o = lookupKeyWrite(c->db,c->argv[1]);
953     if (!o) {
954         addReply(c,shared.czero);
955         return;
956     }
957     expire = getExpire(c->db,c->argv[1]);
958 
959     /* Return zero if the key already exists in the target DB */
960     if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
961         addReply(c,shared.czero);
962         return;
963     }
964     dbAdd(dst,c->argv[1],o);
965     if (expire != -1) setExpire(c,dst,c->argv[1],expire);
966     incrRefCount(o);
967 
968     /* OK! key moved, free the entry in the source DB */
969     dbDelete(src,c->argv[1]);
970     server.dirty++;
971     addReply(c,shared.cone);
972 }
973 
974 /* Helper function for dbSwapDatabases(): scans the list of keys that have
975  * one or more blocked clients for B[LR]POP or other blocking commands
976  * and signal the keys as ready if they are of the right type. See the comment
977  * where the function is used for more info. */
scanDatabaseForReadyLists(redisDb * db)978 void scanDatabaseForReadyLists(redisDb *db) {
979     dictEntry *de;
980     dictIterator *di = dictGetSafeIterator(db->blocking_keys);
981     while((de = dictNext(di)) != NULL) {
982         robj *key = dictGetKey(de);
983         robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
984         if (value && (value->type == OBJ_LIST ||
985                       value->type == OBJ_STREAM ||
986                       value->type == OBJ_ZSET))
987             signalKeyAsReady(db, key);
988     }
989     dictReleaseIterator(di);
990 }
991 
992 /* Swap two databases at runtime so that all clients will magically see
993  * the new database even if already connected. Note that the client
994  * structure c->db points to a given DB, so we need to be smarter and
995  * swap the underlying referenced structures, otherwise we would need
996  * to fix all the references to the Redis DB structure.
997  *
998  * Returns C_ERR if at least one of the DB ids are out of range, otherwise
999  * C_OK is returned. */
dbSwapDatabases(int id1,int id2)1000 int dbSwapDatabases(int id1, int id2) {
1001     if (id1 < 0 || id1 >= server.dbnum ||
1002         id2 < 0 || id2 >= server.dbnum) return C_ERR;
1003     if (id1 == id2) return C_OK;
1004     redisDb aux = server.db[id1];
1005     redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1006 
1007     /* Swap hash tables. Note that we don't swap blocking_keys,
1008      * ready_keys and watched_keys, since we want clients to
1009      * remain in the same DB they were. */
1010     db1->dict = db2->dict;
1011     db1->expires = db2->expires;
1012     db1->avg_ttl = db2->avg_ttl;
1013 
1014     db2->dict = aux.dict;
1015     db2->expires = aux.expires;
1016     db2->avg_ttl = aux.avg_ttl;
1017 
1018     /* Now we need to handle clients blocked on lists: as an effect
1019      * of swapping the two DBs, a client that was waiting for list
1020      * X in a given DB, may now actually be unblocked if X happens
1021      * to exist in the new version of the DB, after the swap.
1022      *
1023      * However normally we only do this check for efficiency reasons
1024      * in dbAdd() when a list is created. So here we need to rescan
1025      * the list of clients blocked on lists and signal lists as ready
1026      * if needed. */
1027     scanDatabaseForReadyLists(db1);
1028     scanDatabaseForReadyLists(db2);
1029     return C_OK;
1030 }
1031 
1032 /* SWAPDB db1 db2 */
swapdbCommand(client * c)1033 void swapdbCommand(client *c) {
1034     long id1, id2;
1035 
1036     /* Not allowed in cluster mode: we have just DB 0 there. */
1037     if (server.cluster_enabled) {
1038         addReplyError(c,"SWAPDB is not allowed in cluster mode");
1039         return;
1040     }
1041 
1042     /* Get the two DBs indexes. */
1043     if (getLongFromObjectOrReply(c, c->argv[1], &id1,
1044         "invalid first DB index") != C_OK)
1045         return;
1046 
1047     if (getLongFromObjectOrReply(c, c->argv[2], &id2,
1048         "invalid second DB index") != C_OK)
1049         return;
1050 
1051     /* Swap... */
1052     if (dbSwapDatabases(id1,id2) == C_ERR) {
1053         addReplyError(c,"DB index is out of range");
1054         return;
1055     } else {
1056         server.dirty++;
1057         addReply(c,shared.ok);
1058     }
1059 }
1060 
1061 /*-----------------------------------------------------------------------------
1062  * Expires API
1063  *----------------------------------------------------------------------------*/
1064 
removeExpire(redisDb * db,robj * key)1065 int removeExpire(redisDb *db, robj *key) {
1066     /* An expire may only be removed if there is a corresponding entry in the
1067      * main dict. Otherwise, the key will never be freed. */
1068     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1069     return dictDelete(db->expires,key->ptr) == DICT_OK;
1070 }
1071 
1072 /* Set an expire to the specified key. If the expire is set in the context
1073  * of an user calling a command 'c' is the client, otherwise 'c' is set
1074  * to NULL. The 'when' parameter is the absolute unix time in milliseconds
1075  * after which the key will no longer be considered valid. */
setExpire(client * c,redisDb * db,robj * key,long long when)1076 void setExpire(client *c, redisDb *db, robj *key, long long when) {
1077     dictEntry *kde, *de;
1078 
1079     /* Reuse the sds from the main dict in the expire dict */
1080     kde = dictFind(db->dict,key->ptr);
1081     serverAssertWithInfo(NULL,key,kde != NULL);
1082     de = dictAddOrFind(db->expires,dictGetKey(kde));
1083     dictSetSignedIntegerVal(de,when);
1084 
1085     int writable_slave = server.masterhost && server.repl_slave_ro == 0;
1086     if (c && writable_slave && !(c->flags & CLIENT_MASTER))
1087         rememberSlaveKeyWithExpire(db,key);
1088 }
1089 
1090 /* Return the expire time of the specified key, or -1 if no expire
1091  * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)1092 long long getExpire(redisDb *db, robj *key) {
1093     dictEntry *de;
1094 
1095     /* No expire? return ASAP */
1096     if (dictSize(db->expires) == 0 ||
1097        (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
1098 
1099     /* The entry was found in the expire dict, this means it should also
1100      * be present in the main dict (safety check). */
1101     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1102     return dictGetSignedIntegerVal(de);
1103 }
1104 
1105 /* Propagate expires into slaves and the AOF file.
1106  * When a key expires in the master, a DEL operation for this key is sent
1107  * to all the slaves and the AOF file if enabled.
1108  *
1109  * This way the key expiry is centralized in one place, and since both
1110  * AOF and the master->slave link guarantee operation ordering, everything
1111  * will be consistent even if we allow write operations against expiring
1112  * keys. */
propagateExpire(redisDb * db,robj * key,int lazy)1113 void propagateExpire(redisDb *db, robj *key, int lazy) {
1114     robj *argv[2];
1115 
1116     argv[0] = lazy ? shared.unlink : shared.del;
1117     argv[1] = key;
1118     incrRefCount(argv[0]);
1119     incrRefCount(argv[1]);
1120 
1121     if (server.aof_state != AOF_OFF)
1122         feedAppendOnlyFile(server.delCommand,db->id,argv,2);
1123     replicationFeedSlaves(server.slaves,db->id,argv,2);
1124 
1125     decrRefCount(argv[0]);
1126     decrRefCount(argv[1]);
1127 }
1128 
1129 /* Check if the key is expired. */
keyIsExpired(redisDb * db,robj * key)1130 int keyIsExpired(redisDb *db, robj *key) {
1131     mstime_t when = getExpire(db,key);
1132 
1133     if (when < 0) return 0; /* No expire for this key */
1134 
1135     /* Don't expire anything while loading. It will be done later. */
1136     if (server.loading) return 0;
1137 
1138     /* If we are in the context of a Lua script, we pretend that time is
1139      * blocked to when the Lua script started. This way a key can expire
1140      * only the first time it is accessed and not in the middle of the
1141      * script execution, making propagation to slaves / AOF consistent.
1142      * See issue #1525 on Github for more information. */
1143     mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
1144 
1145     return now > when;
1146 }
1147 
1148 /* This function is called when we are going to perform some operation
1149  * in a given key, but such key may be already logically expired even if
1150  * it still exists in the database. The main way this function is called
1151  * is via lookupKey*() family of functions.
1152  *
1153  * The behavior of the function depends on the replication role of the
1154  * instance, because slave instances do not expire keys, they wait
1155  * for DELs from the master for consistency matters. However even
1156  * slaves will try to have a coherent return value for the function,
1157  * so that read commands executed in the slave side will be able to
1158  * behave like if the key is expired even if still present (because the
1159  * master has yet to propagate the DEL).
1160  *
1161  * In masters as a side effect of finding a key which is expired, such
1162  * key will be evicted from the database. Also this may trigger the
1163  * propagation of a DEL/UNLINK command in AOF / replication stream.
1164  *
1165  * The return value of the function is 0 if the key is still valid,
1166  * otherwise the function returns 1 if the key is expired. */
expireIfNeeded(redisDb * db,robj * key)1167 int expireIfNeeded(redisDb *db, robj *key) {
1168     if (!keyIsExpired(db,key)) return 0;
1169 
1170     /* If we are running in the context of a slave, instead of
1171      * evicting the expired key from the database, we return ASAP:
1172      * the slave key expiration is controlled by the master that will
1173      * send us synthesized DEL operations for expired keys.
1174      *
1175      * Still we try to return the right information to the caller,
1176      * that is, 0 if we think the key should be still valid, 1 if
1177      * we think the key is expired at this time. */
1178     if (server.masterhost != NULL) return 1;
1179 
1180     /* Delete the key */
1181     server.stat_expiredkeys++;
1182     propagateExpire(db,key,server.lazyfree_lazy_expire);
1183     notifyKeyspaceEvent(NOTIFY_EXPIRED,
1184         "expired",key,db->id);
1185     return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
1186                                          dbSyncDelete(db,key);
1187 }
1188 
1189 /* -----------------------------------------------------------------------------
1190  * API to get key arguments from commands
1191  * ---------------------------------------------------------------------------*/
1192 
1193 /* The base case is to use the keys position as given in the command table
1194  * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1195 int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
1196     int j, i = 0, last, *keys;
1197     UNUSED(argv);
1198 
1199     if (cmd->firstkey == 0) {
1200         *numkeys = 0;
1201         return NULL;
1202     }
1203 
1204     last = cmd->lastkey;
1205     if (last < 0) last = argc+last;
1206     keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
1207     for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1208         if (j >= argc) {
1209             /* Modules commands, and standard commands with a not fixed number
1210              * of arguments (negative arity parameter) do not have dispatch
1211              * time arity checks, so we need to handle the case where the user
1212              * passed an invalid number of arguments here. In this case we
1213              * return no keys and expect the command implementation to report
1214              * an arity or syntax error. */
1215             if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
1216                 zfree(keys);
1217                 *numkeys = 0;
1218                 return NULL;
1219             } else {
1220                 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
1221             }
1222         }
1223         keys[i++] = j;
1224     }
1225     *numkeys = i;
1226     return keys;
1227 }
1228 
1229 /* Return all the arguments that are keys in the command passed via argc / argv.
1230  *
1231  * The command returns the positions of all the key arguments inside the array,
1232  * so the actual return value is an heap allocated array of integers. The
1233  * length of the array is returned by reference into *numkeys.
1234  *
1235  * 'cmd' must be point to the corresponding entry into the redisCommand
1236  * table, according to the command name in argv[0].
1237  *
1238  * This function uses the command table if a command-specific helper function
1239  * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1240 int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1241     if (cmd->flags & CMD_MODULE_GETKEYS) {
1242         return moduleGetCommandKeysViaAPI(cmd,argv,argc,numkeys);
1243     } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) {
1244         return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1245     } else {
1246         return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
1247     }
1248 }
1249 
1250 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(int * result)1251 void getKeysFreeResult(int *result) {
1252     zfree(result);
1253 }
1254 
1255 /* Helper function to extract keys from following commands:
1256  * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1257  * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1258 int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1259     int i, num, *keys;
1260     UNUSED(cmd);
1261 
1262     num = atoi(argv[2]->ptr);
1263     /* Sanity check. Don't return any key if the command is going to
1264      * reply with syntax error. */
1265     if (num < 1 || num > (argc-3)) {
1266         *numkeys = 0;
1267         return NULL;
1268     }
1269 
1270     /* Keys in z{union,inter}store come from two places:
1271      * argv[1] = storage key,
1272      * argv[3...n] = keys to intersect */
1273     keys = zmalloc(sizeof(int)*(num+1));
1274 
1275     /* Add all key positions for argv[3...n] to keys[] */
1276     for (i = 0; i < num; i++) keys[i] = 3+i;
1277 
1278     /* Finally add the argv[1] key position (the storage key target). */
1279     keys[num] = 1;
1280     *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
1281     return keys;
1282 }
1283 
1284 /* Helper function to extract keys from the following commands:
1285  * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1286  * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1287 int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1288     int i, num, *keys;
1289     UNUSED(cmd);
1290 
1291     num = atoi(argv[2]->ptr);
1292     /* Sanity check. Don't return any key if the command is going to
1293      * reply with syntax error. */
1294     if (num <= 0 || num > (argc-3)) {
1295         *numkeys = 0;
1296         return NULL;
1297     }
1298 
1299     keys = zmalloc(sizeof(int)*num);
1300     *numkeys = num;
1301 
1302     /* Add all key positions for argv[3...n] to keys[] */
1303     for (i = 0; i < num; i++) keys[i] = 3+i;
1304 
1305     return keys;
1306 }
1307 
1308 /* Helper function to extract keys from the SORT command.
1309  *
1310  * SORT <sort-key> ... STORE <store-key> ...
1311  *
1312  * The first argument of SORT is always a key, however a list of options
1313  * follow in SQL-alike style. Here we parse just the minimum in order to
1314  * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1315 int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1316     int i, j, num, *keys, found_store = 0;
1317     UNUSED(cmd);
1318 
1319     num = 0;
1320     keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
1321 
1322     keys[num++] = 1; /* <sort-key> is always present. */
1323 
1324     /* Search for STORE option. By default we consider options to don't
1325      * have arguments, so if we find an unknown option name we scan the
1326      * next. However there are options with 1 or 2 arguments, so we
1327      * provide a list here in order to skip the right number of args. */
1328     struct {
1329         char *name;
1330         int skip;
1331     } skiplist[] = {
1332         {"limit", 2},
1333         {"get", 1},
1334         {"by", 1},
1335         {NULL, 0} /* End of elements. */
1336     };
1337 
1338     for (i = 2; i < argc; i++) {
1339         for (j = 0; skiplist[j].name != NULL; j++) {
1340             if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1341                 i += skiplist[j].skip;
1342                 break;
1343             } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1344                 /* Note: we don't increment "num" here and continue the loop
1345                  * to be sure to process the *last* "STORE" option if multiple
1346                  * ones are provided. This is same behavior as SORT. */
1347                 found_store = 1;
1348                 keys[num] = i+1; /* <store-key> */
1349                 break;
1350             }
1351         }
1352     }
1353     *numkeys = num + found_store;
1354     return keys;
1355 }
1356 
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1357 int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1358     int i, num, first, *keys;
1359     UNUSED(cmd);
1360 
1361     /* Assume the obvious form. */
1362     first = 3;
1363     num = 1;
1364 
1365     /* But check for the extended one with the KEYS option. */
1366     if (argc > 6) {
1367         for (i = 6; i < argc; i++) {
1368             if (!strcasecmp(argv[i]->ptr,"keys") &&
1369                 sdslen(argv[3]->ptr) == 0)
1370             {
1371                 first = i+1;
1372                 num = argc-first;
1373                 break;
1374             }
1375         }
1376     }
1377 
1378     keys = zmalloc(sizeof(int)*num);
1379     for (i = 0; i < num; i++) keys[i] = first+i;
1380     *numkeys = num;
1381     return keys;
1382 }
1383 
1384 /* Helper function to extract keys from following commands:
1385  * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
1386  *                             [COUNT count] [STORE key] [STOREDIST key]
1387  * GEORADIUSBYMEMBER key member radius unit ... options ... */
georadiusGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1388 int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1389     int i, num, *keys;
1390     UNUSED(cmd);
1391 
1392     /* Check for the presence of the stored key in the command */
1393     int stored_key = -1;
1394     for (i = 5; i < argc; i++) {
1395         char *arg = argv[i]->ptr;
1396         /* For the case when user specifies both "store" and "storedist" options, the
1397          * second key specified would override the first key. This behavior is kept
1398          * the same as in georadiusCommand method.
1399          */
1400         if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
1401             stored_key = i+1;
1402             i++;
1403         }
1404     }
1405     num = 1 + (stored_key == -1 ? 0 : 1);
1406 
1407     /* Keys in the command come from two places:
1408      * argv[1] = key,
1409      * argv[5...n] = stored key if present
1410      */
1411     keys = zmalloc(sizeof(int) * num);
1412 
1413     /* Add all key positions to keys[] */
1414     keys[0] = 1;
1415     if(num > 1) {
1416          keys[1] = stored_key;
1417     }
1418     *numkeys = num;
1419     return keys;
1420 }
1421 
1422 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
1423  *       STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
xreadGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1424 int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1425     int i, num = 0, *keys;
1426     UNUSED(cmd);
1427 
1428     /* We need to parse the options of the command in order to seek the first
1429      * "STREAMS" string which is actually the option. This is needed because
1430      * "STREAMS" could also be the name of the consumer group and even the
1431      * name of the stream key. */
1432     int streams_pos = -1;
1433     for (i = 1; i < argc; i++) {
1434         char *arg = argv[i]->ptr;
1435         if (!strcasecmp(arg, "block")) {
1436             i++; /* Skip option argument. */
1437         } else if (!strcasecmp(arg, "count")) {
1438             i++; /* Skip option argument. */
1439         } else if (!strcasecmp(arg, "group")) {
1440             i += 2; /* Skip option argument. */
1441         } else if (!strcasecmp(arg, "noack")) {
1442             /* Nothing to do. */
1443         } else if (!strcasecmp(arg, "streams")) {
1444             streams_pos = i;
1445             break;
1446         } else {
1447             break; /* Syntax error. */
1448         }
1449     }
1450     if (streams_pos != -1) num = argc - streams_pos - 1;
1451 
1452     /* Syntax error. */
1453     if (streams_pos == -1 || num == 0 || num % 2 != 0) {
1454         *numkeys = 0;
1455         return NULL;
1456     }
1457     num /= 2; /* We have half the keys as there are arguments because
1458                  there are also the IDs, one per key. */
1459 
1460     keys = zmalloc(sizeof(int) * num);
1461     for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
1462     *numkeys = num;
1463     return keys;
1464 }
1465 
1466 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1467  * a fast way a key that belongs to a specified hash slot. This is useful
1468  * while rehashing the cluster and in other conditions when we need to
1469  * understand if we have keys for a given hash slot. */
slotToKeyUpdateKey(robj * key,int add)1470 void slotToKeyUpdateKey(robj *key, int add) {
1471     unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1472     unsigned char buf[64];
1473     unsigned char *indexed = buf;
1474     size_t keylen = sdslen(key->ptr);
1475 
1476     server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
1477     if (keylen+2 > 64) indexed = zmalloc(keylen+2);
1478     indexed[0] = (hashslot >> 8) & 0xff;
1479     indexed[1] = hashslot & 0xff;
1480     memcpy(indexed+2,key->ptr,keylen);
1481     if (add) {
1482         raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
1483     } else {
1484         raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
1485     }
1486     if (indexed != buf) zfree(indexed);
1487 }
1488 
slotToKeyAdd(robj * key)1489 void slotToKeyAdd(robj *key) {
1490     slotToKeyUpdateKey(key,1);
1491 }
1492 
slotToKeyDel(robj * key)1493 void slotToKeyDel(robj *key) {
1494     slotToKeyUpdateKey(key,0);
1495 }
1496 
slotToKeyFlush(void)1497 void slotToKeyFlush(void) {
1498     raxFree(server.cluster->slots_to_keys);
1499     server.cluster->slots_to_keys = raxNew();
1500     memset(server.cluster->slots_keys_count,0,
1501            sizeof(server.cluster->slots_keys_count));
1502 }
1503 
1504 /* Pupulate the specified array of objects with keys in the specified slot.
1505  * New objects are returned to represent keys, it's up to the caller to
1506  * decrement the reference count to release the keys names. */
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1507 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1508     raxIterator iter;
1509     int j = 0;
1510     unsigned char indexed[2];
1511 
1512     indexed[0] = (hashslot >> 8) & 0xff;
1513     indexed[1] = hashslot & 0xff;
1514     raxStart(&iter,server.cluster->slots_to_keys);
1515     raxSeek(&iter,">=",indexed,2);
1516     while(count-- && raxNext(&iter)) {
1517         if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
1518         keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
1519     }
1520     raxStop(&iter);
1521     return j;
1522 }
1523 
1524 /* Remove all the keys in the specified hash slot.
1525  * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1526 unsigned int delKeysInSlot(unsigned int hashslot) {
1527     raxIterator iter;
1528     int j = 0;
1529     unsigned char indexed[2];
1530 
1531     indexed[0] = (hashslot >> 8) & 0xff;
1532     indexed[1] = hashslot & 0xff;
1533     raxStart(&iter,server.cluster->slots_to_keys);
1534     while(server.cluster->slots_keys_count[hashslot]) {
1535         raxSeek(&iter,">=",indexed,2);
1536         raxNext(&iter);
1537 
1538         robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
1539         dbDelete(&server.db[0],key);
1540         decrRefCount(key);
1541         j++;
1542     }
1543     raxStop(&iter);
1544     return j;
1545 }
1546 
countKeysInSlot(unsigned int hashslot)1547 unsigned int countKeysInSlot(unsigned int hashslot) {
1548     return server.cluster->slots_keys_count[hashslot];
1549 }
1550