1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include "atomicvar.h"
33
34 #include <signal.h>
35 #include <ctype.h>
36
37 /*-----------------------------------------------------------------------------
38 * C-level DB API
39 *----------------------------------------------------------------------------*/
40
41 int keyIsExpired(redisDb *db, robj *key);
42
43 /* Update LFU when an object is accessed.
44 * Firstly, decrement the counter if the decrement time is reached.
45 * Then logarithmically increment the counter, and update the access time. */
updateLFU(robj * val)46 void updateLFU(robj *val) {
47 unsigned long counter = LFUDecrAndReturn(val);
48 counter = LFULogIncr(counter);
49 val->lru = (LFUGetTimeInMinutes()<<8) | counter;
50 }
51
52 /* Low level key lookup API, not actually called directly from commands
53 * implementations that should instead rely on lookupKeyRead(),
54 * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)55 robj *lookupKey(redisDb *db, robj *key, int flags) {
56 dictEntry *de = dictFind(db->dict,key->ptr);
57 if (de) {
58 robj *val = dictGetVal(de);
59
60 /* Update the access time for the ageing algorithm.
61 * Don't do it if we have a saving child, as this will trigger
62 * a copy on write madness. */
63 if (server.rdb_child_pid == -1 &&
64 server.aof_child_pid == -1 &&
65 !(flags & LOOKUP_NOTOUCH))
66 {
67 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
68 updateLFU(val);
69 } else {
70 val->lru = LRU_CLOCK();
71 }
72 }
73 return val;
74 } else {
75 return NULL;
76 }
77 }
78
79 /* Lookup a key for read operations, or return NULL if the key is not found
80 * in the specified DB.
81 *
82 * As a side effect of calling this function:
83 * 1. A key gets expired if it reached it's TTL.
84 * 2. The key last access time is updated.
85 * 3. The global keys hits/misses stats are updated (reported in INFO).
86 *
87 * This API should not be used when we write to the key after obtaining
88 * the object linked to the key, but only for read only operations.
89 *
90 * Flags change the behavior of this command:
91 *
92 * LOOKUP_NONE (or zero): no special flags are passed.
93 * LOOKUP_NOTOUCH: don't alter the last access time of the key.
94 *
95 * Note: this function also returns NULL if the key is logically expired
96 * but still existing, in case this is a slave, since this API is called only
97 * for read operations. Even if the key expiry is master-driven, we can
98 * correctly report a key is expired on slaves even if the master is lagging
99 * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)100 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
101 robj *val;
102
103 if (expireIfNeeded(db,key) == 1) {
104 /* Key expired. If we are in the context of a master, expireIfNeeded()
105 * returns 0 only when the key does not exist at all, so it's safe
106 * to return NULL ASAP. */
107 if (server.masterhost == NULL) {
108 server.stat_keyspace_misses++;
109 return NULL;
110 }
111
112 /* However if we are in the context of a slave, expireIfNeeded() will
113 * not really try to expire the key, it only returns information
114 * about the "logical" status of the key: key expiring is up to the
115 * master in order to have a consistent view of master's data set.
116 *
117 * However, if the command caller is not the master, and as additional
118 * safety measure, the command invoked is a read-only command, we can
119 * safely return NULL here, and provide a more consistent behavior
120 * to clients accessign expired values in a read-only fashion, that
121 * will say the key as non existing.
122 *
123 * Notably this covers GETs when slaves are used to scale reads. */
124 if (server.current_client &&
125 server.current_client != server.master &&
126 server.current_client->cmd &&
127 server.current_client->cmd->flags & CMD_READONLY)
128 {
129 server.stat_keyspace_misses++;
130 return NULL;
131 }
132 }
133 val = lookupKey(db,key,flags);
134 if (val == NULL)
135 server.stat_keyspace_misses++;
136 else
137 server.stat_keyspace_hits++;
138 return val;
139 }
140
141 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
142 * common case. */
lookupKeyRead(redisDb * db,robj * key)143 robj *lookupKeyRead(redisDb *db, robj *key) {
144 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
145 }
146
147 /* Lookup a key for write operations, and as a side effect, if needed, expires
148 * the key if its TTL is reached.
149 *
150 * Returns the linked value object if the key exists or NULL if the key
151 * does not exist in the specified DB. */
lookupKeyWrite(redisDb * db,robj * key)152 robj *lookupKeyWrite(redisDb *db, robj *key) {
153 expireIfNeeded(db,key);
154 return lookupKey(db,key,LOOKUP_NONE);
155 }
156
lookupKeyReadOrReply(client * c,robj * key,robj * reply)157 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
158 robj *o = lookupKeyRead(c->db, key);
159 if (!o) addReply(c,reply);
160 return o;
161 }
162
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)163 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
164 robj *o = lookupKeyWrite(c->db, key);
165 if (!o) addReply(c,reply);
166 return o;
167 }
168
169 /* Add the key to the DB. It's up to the caller to increment the reference
170 * counter of the value if needed.
171 *
172 * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)173 void dbAdd(redisDb *db, robj *key, robj *val) {
174 sds copy = sdsdup(key->ptr);
175 int retval = dictAdd(db->dict, copy, val);
176
177 serverAssertWithInfo(NULL,key,retval == DICT_OK);
178 if (val->type == OBJ_LIST ||
179 val->type == OBJ_ZSET)
180 signalKeyAsReady(db, key);
181 if (server.cluster_enabled) slotToKeyAdd(key);
182 }
183
184 /* Overwrite an existing key with a new value. Incrementing the reference
185 * count of the new value is up to the caller.
186 * This function does not modify the expire time of the existing key.
187 *
188 * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)189 void dbOverwrite(redisDb *db, robj *key, robj *val) {
190 dictEntry *de = dictFind(db->dict,key->ptr);
191
192 serverAssertWithInfo(NULL,key,de != NULL);
193 dictEntry auxentry = *de;
194 robj *old = dictGetVal(de);
195 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
196 val->lru = old->lru;
197 }
198 dictSetVal(db->dict, de, val);
199
200 if (server.lazyfree_lazy_server_del) {
201 freeObjAsync(old);
202 dictSetVal(db->dict, &auxentry, NULL);
203 }
204
205 dictFreeVal(db->dict, &auxentry);
206 }
207
208 /* High level Set operation. This function can be used in order to set
209 * a key, whatever it was existing or not, to a new object.
210 *
211 * 1) The ref count of the value object is incremented.
212 * 2) clients WATCHing for the destination key notified.
213 * 3) The expire time of the key is reset (the key is made persistent).
214 *
215 * All the new keys in the database should be created via this interface. */
setKey(redisDb * db,robj * key,robj * val)216 void setKey(redisDb *db, robj *key, robj *val) {
217 if (lookupKeyWrite(db,key) == NULL) {
218 dbAdd(db,key,val);
219 } else {
220 dbOverwrite(db,key,val);
221 }
222 incrRefCount(val);
223 removeExpire(db,key);
224 signalModifiedKey(db,key);
225 }
226
dbExists(redisDb * db,robj * key)227 int dbExists(redisDb *db, robj *key) {
228 return dictFind(db->dict,key->ptr) != NULL;
229 }
230
231 /* Return a random key, in form of a Redis object.
232 * If there are no keys, NULL is returned.
233 *
234 * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)235 robj *dbRandomKey(redisDb *db) {
236 dictEntry *de;
237 int maxtries = 100;
238 int allvolatile = dictSize(db->dict) == dictSize(db->expires);
239
240 while(1) {
241 sds key;
242 robj *keyobj;
243
244 de = dictGetRandomKey(db->dict);
245 if (de == NULL) return NULL;
246
247 key = dictGetKey(de);
248 keyobj = createStringObject(key,sdslen(key));
249 if (dictFind(db->expires,key)) {
250 if (allvolatile && server.masterhost && --maxtries == 0) {
251 /* If the DB is composed only of keys with an expire set,
252 * it could happen that all the keys are already logically
253 * expired in the slave, so the function cannot stop because
254 * expireIfNeeded() is false, nor it can stop because
255 * dictGetRandomKey() returns NULL (there are keys to return).
256 * To prevent the infinite loop we do some tries, but if there
257 * are the conditions for an infinite loop, eventually we
258 * return a key name that may be already expired. */
259 return keyobj;
260 }
261 if (expireIfNeeded(db,keyobj)) {
262 decrRefCount(keyobj);
263 continue; /* search for another key. This expired. */
264 }
265 }
266 return keyobj;
267 }
268 }
269
270 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbSyncDelete(redisDb * db,robj * key)271 int dbSyncDelete(redisDb *db, robj *key) {
272 /* Deleting an entry from the expires dict will not free the sds of
273 * the key, because it is shared with the main dictionary. */
274 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
275 if (dictDelete(db->dict,key->ptr) == DICT_OK) {
276 if (server.cluster_enabled) slotToKeyDel(key);
277 return 1;
278 } else {
279 return 0;
280 }
281 }
282
283 /* This is a wrapper whose behavior depends on the Redis lazy free
284 * configuration. Deletes the key synchronously or asynchronously. */
dbDelete(redisDb * db,robj * key)285 int dbDelete(redisDb *db, robj *key) {
286 return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
287 dbSyncDelete(db,key);
288 }
289
290 /* Prepare the string object stored at 'key' to be modified destructively
291 * to implement commands like SETBIT or APPEND.
292 *
293 * An object is usually ready to be modified unless one of the two conditions
294 * are true:
295 *
296 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
297 * other users.
298 * 2) The object encoding is not "RAW".
299 *
300 * If the object is found in one of the above conditions (or both) by the
301 * function, an unshared / not-encoded copy of the string object is stored
302 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
303 * returned.
304 *
305 * USAGE:
306 *
307 * The object 'o' is what the caller already obtained by looking up 'key'
308 * in 'db', the usage pattern looks like this:
309 *
310 * o = lookupKeyWrite(db,key);
311 * if (checkType(c,o,OBJ_STRING)) return;
312 * o = dbUnshareStringValue(db,key,o);
313 *
314 * At this point the caller is ready to modify the object, for example
315 * using an sdscat() call to append some data, or anything else.
316 */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)317 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
318 serverAssert(o->type == OBJ_STRING);
319 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
320 robj *decoded = getDecodedObject(o);
321 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
322 decrRefCount(decoded);
323 dbOverwrite(db,key,o);
324 }
325 return o;
326 }
327
328 /* Remove all keys from all the databases in a Redis server.
329 * If callback is given the function is called from time to time to
330 * signal that work is in progress.
331 *
332 * The dbnum can be -1 if all the DBs should be flushed, or the specified
333 * DB number if we want to flush only a single Redis database number.
334 *
335 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
336 * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
337 * and the function to return ASAP.
338 *
339 * On success the fuction returns the number of keys removed from the
340 * database(s). Otherwise -1 is returned in the specific case the
341 * DB number is out of range, and errno is set to EINVAL. */
emptyDb(int dbnum,int flags,void (callback)(void *))342 long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
343 int async = (flags & EMPTYDB_ASYNC);
344 long long removed = 0;
345
346 if (dbnum < -1 || dbnum >= server.dbnum) {
347 errno = EINVAL;
348 return -1;
349 }
350
351 int startdb, enddb;
352 if (dbnum == -1) {
353 startdb = 0;
354 enddb = server.dbnum-1;
355 } else {
356 startdb = enddb = dbnum;
357 }
358
359 for (int j = startdb; j <= enddb; j++) {
360 removed += dictSize(server.db[j].dict);
361 if (async) {
362 emptyDbAsync(&server.db[j]);
363 } else {
364 dictEmpty(server.db[j].dict,callback);
365 dictEmpty(server.db[j].expires,callback);
366 }
367 }
368 if (server.cluster_enabled) {
369 if (async) {
370 slotToKeyFlushAsync();
371 } else {
372 slotToKeyFlush();
373 }
374 }
375 if (dbnum == -1) flushSlaveKeysWithExpireList();
376 return removed;
377 }
378
selectDb(client * c,int id)379 int selectDb(client *c, int id) {
380 if (id < 0 || id >= server.dbnum)
381 return C_ERR;
382 c->db = &server.db[id];
383 return C_OK;
384 }
385
386 /*-----------------------------------------------------------------------------
387 * Hooks for key space changes.
388 *
389 * Every time a key in the database is modified the function
390 * signalModifiedKey() is called.
391 *
392 * Every time a DB is flushed the function signalFlushDb() is called.
393 *----------------------------------------------------------------------------*/
394
signalModifiedKey(redisDb * db,robj * key)395 void signalModifiedKey(redisDb *db, robj *key) {
396 touchWatchedKey(db,key);
397 }
398
signalFlushedDb(int dbid)399 void signalFlushedDb(int dbid) {
400 touchWatchedKeysOnFlush(dbid);
401 }
402
403 /*-----------------------------------------------------------------------------
404 * Type agnostic commands operating on the key space
405 *----------------------------------------------------------------------------*/
406
407 /* Return the set of flags to use for the emptyDb() call for FLUSHALL
408 * and FLUSHDB commands.
409 *
410 * Currently the command just attempts to parse the "ASYNC" option. It
411 * also checks if the command arity is wrong.
412 *
413 * On success C_OK is returned and the flags are stored in *flags, otherwise
414 * C_ERR is returned and the function sends an error to the client. */
getFlushCommandFlags(client * c,int * flags)415 int getFlushCommandFlags(client *c, int *flags) {
416 /* Parse the optional ASYNC option. */
417 if (c->argc > 1) {
418 if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
419 addReply(c,shared.syntaxerr);
420 return C_ERR;
421 }
422 *flags = EMPTYDB_ASYNC;
423 } else {
424 *flags = EMPTYDB_NO_FLAGS;
425 }
426 return C_OK;
427 }
428
429 /* FLUSHDB [ASYNC]
430 *
431 * Flushes the currently SELECTed Redis DB. */
flushdbCommand(client * c)432 void flushdbCommand(client *c) {
433 int flags;
434
435 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
436 signalFlushedDb(c->db->id);
437 server.dirty += emptyDb(c->db->id,flags,NULL);
438 addReply(c,shared.ok);
439 }
440
441 /* FLUSHALL [ASYNC]
442 *
443 * Flushes the whole server data set. */
flushallCommand(client * c)444 void flushallCommand(client *c) {
445 int flags;
446
447 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
448 signalFlushedDb(-1);
449 server.dirty += emptyDb(-1,flags,NULL);
450 addReply(c,shared.ok);
451 if (server.rdb_child_pid != -1) {
452 kill(server.rdb_child_pid,SIGUSR1);
453 rdbRemoveTempFile(server.rdb_child_pid);
454 }
455 if (server.saveparamslen > 0) {
456 /* Normally rdbSave() will reset dirty, but we don't want this here
457 * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
458 int saved_dirty = server.dirty;
459 rdbSaveInfo rsi, *rsiptr;
460 rsiptr = rdbPopulateSaveInfo(&rsi);
461 rdbSave(server.rdb_filename,rsiptr);
462 server.dirty = saved_dirty;
463 }
464 server.dirty++;
465 }
466
467 /* This command implements DEL and LAZYDEL. */
delGenericCommand(client * c,int lazy)468 void delGenericCommand(client *c, int lazy) {
469 int numdel = 0, j;
470
471 for (j = 1; j < c->argc; j++) {
472 expireIfNeeded(c->db,c->argv[j]);
473 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
474 dbSyncDelete(c->db,c->argv[j]);
475 if (deleted) {
476 signalModifiedKey(c->db,c->argv[j]);
477 notifyKeyspaceEvent(NOTIFY_GENERIC,
478 "del",c->argv[j],c->db->id);
479 server.dirty++;
480 numdel++;
481 }
482 }
483 addReplyLongLong(c,numdel);
484 }
485
delCommand(client * c)486 void delCommand(client *c) {
487 delGenericCommand(c,0);
488 }
489
unlinkCommand(client * c)490 void unlinkCommand(client *c) {
491 delGenericCommand(c,1);
492 }
493
494 /* EXISTS key1 key2 ... key_N.
495 * Return value is the number of keys existing. */
existsCommand(client * c)496 void existsCommand(client *c) {
497 long long count = 0;
498 int j;
499
500 for (j = 1; j < c->argc; j++) {
501 if (lookupKeyRead(c->db,c->argv[j])) count++;
502 }
503 addReplyLongLong(c,count);
504 }
505
selectCommand(client * c)506 void selectCommand(client *c) {
507 long id;
508
509 if (getLongFromObjectOrReply(c, c->argv[1], &id,
510 "invalid DB index") != C_OK)
511 return;
512
513 if (server.cluster_enabled && id != 0) {
514 addReplyError(c,"SELECT is not allowed in cluster mode");
515 return;
516 }
517 if (selectDb(c,id) == C_ERR) {
518 addReplyError(c,"DB index is out of range");
519 } else {
520 addReply(c,shared.ok);
521 }
522 }
523
randomkeyCommand(client * c)524 void randomkeyCommand(client *c) {
525 robj *key;
526
527 if ((key = dbRandomKey(c->db)) == NULL) {
528 addReply(c,shared.nullbulk);
529 return;
530 }
531
532 addReplyBulk(c,key);
533 decrRefCount(key);
534 }
535
keysCommand(client * c)536 void keysCommand(client *c) {
537 dictIterator *di;
538 dictEntry *de;
539 sds pattern = c->argv[1]->ptr;
540 int plen = sdslen(pattern), allkeys;
541 unsigned long numkeys = 0;
542 void *replylen = addDeferredMultiBulkLength(c);
543
544 di = dictGetSafeIterator(c->db->dict);
545 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
546 while((de = dictNext(di)) != NULL) {
547 sds key = dictGetKey(de);
548 robj *keyobj;
549
550 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
551 keyobj = createStringObject(key,sdslen(key));
552 if (!keyIsExpired(c->db,keyobj)) {
553 addReplyBulk(c,keyobj);
554 numkeys++;
555 }
556 decrRefCount(keyobj);
557 }
558 }
559 dictReleaseIterator(di);
560 setDeferredMultiBulkLength(c,replylen,numkeys);
561 }
562
563 /* This callback is used by scanGenericCommand in order to collect elements
564 * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)565 void scanCallback(void *privdata, const dictEntry *de) {
566 void **pd = (void**) privdata;
567 list *keys = pd[0];
568 robj *o = pd[1];
569 robj *key, *val = NULL;
570
571 if (o == NULL) {
572 sds sdskey = dictGetKey(de);
573 key = createStringObject(sdskey, sdslen(sdskey));
574 } else if (o->type == OBJ_SET) {
575 sds keysds = dictGetKey(de);
576 key = createStringObject(keysds,sdslen(keysds));
577 } else if (o->type == OBJ_HASH) {
578 sds sdskey = dictGetKey(de);
579 sds sdsval = dictGetVal(de);
580 key = createStringObject(sdskey,sdslen(sdskey));
581 val = createStringObject(sdsval,sdslen(sdsval));
582 } else if (o->type == OBJ_ZSET) {
583 sds sdskey = dictGetKey(de);
584 key = createStringObject(sdskey,sdslen(sdskey));
585 val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
586 } else {
587 serverPanic("Type not handled in SCAN callback.");
588 }
589
590 listAddNodeTail(keys, key);
591 if (val) listAddNodeTail(keys, val);
592 }
593
594 /* Try to parse a SCAN cursor stored at object 'o':
595 * if the cursor is valid, store it as unsigned integer into *cursor and
596 * returns C_OK. Otherwise return C_ERR and send an error to the
597 * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)598 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
599 char *eptr;
600
601 /* Use strtoul() because we need an *unsigned* long, so
602 * getLongLongFromObject() does not cover the whole cursor space. */
603 errno = 0;
604 *cursor = strtoul(o->ptr, &eptr, 10);
605 if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
606 {
607 addReplyError(c, "invalid cursor");
608 return C_ERR;
609 }
610 return C_OK;
611 }
612
613 /* This command implements SCAN, HSCAN and SSCAN commands.
614 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
615 * if 'o' is NULL the command will operate on the dictionary associated with
616 * the current database.
617 *
618 * When 'o' is not NULL the function assumes that the first argument in
619 * the client arguments vector is a key so it skips it before iterating
620 * in order to parse options.
621 *
622 * In the case of a Hash object the function returns both the field and value
623 * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)624 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
625 int i, j;
626 list *keys = listCreate();
627 listNode *node, *nextnode;
628 long count = 10;
629 sds pat = NULL;
630 int patlen = 0, use_pattern = 0;
631 dict *ht;
632
633 /* Object must be NULL (to iterate keys names), or the type of the object
634 * must be Set, Sorted Set, or Hash. */
635 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
636 o->type == OBJ_ZSET);
637
638 /* Set i to the first option argument. The previous one is the cursor. */
639 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
640
641 /* Step 1: Parse options. */
642 while (i < c->argc) {
643 j = c->argc - i;
644 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
645 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
646 != C_OK)
647 {
648 goto cleanup;
649 }
650
651 if (count < 1) {
652 addReply(c,shared.syntaxerr);
653 goto cleanup;
654 }
655
656 i += 2;
657 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
658 pat = c->argv[i+1]->ptr;
659 patlen = sdslen(pat);
660
661 /* The pattern always matches if it is exactly "*", so it is
662 * equivalent to disabling it. */
663 use_pattern = !(pat[0] == '*' && patlen == 1);
664
665 i += 2;
666 } else {
667 addReply(c,shared.syntaxerr);
668 goto cleanup;
669 }
670 }
671
672 /* Step 2: Iterate the collection.
673 *
674 * Note that if the object is encoded with a ziplist, intset, or any other
675 * representation that is not a hash table, we are sure that it is also
676 * composed of a small number of elements. So to avoid taking state we
677 * just return everything inside the object in a single call, setting the
678 * cursor to zero to signal the end of the iteration. */
679
680 /* Handle the case of a hash table. */
681 ht = NULL;
682 if (o == NULL) {
683 ht = c->db->dict;
684 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
685 ht = o->ptr;
686 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
687 ht = o->ptr;
688 count *= 2; /* We return key / value for this type. */
689 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
690 zset *zs = o->ptr;
691 ht = zs->dict;
692 count *= 2; /* We return key / value for this type. */
693 }
694
695 if (ht) {
696 void *privdata[2];
697 /* We set the max number of iterations to ten times the specified
698 * COUNT, so if the hash table is in a pathological state (very
699 * sparsely populated) we avoid to block too much time at the cost
700 * of returning no or very few elements. */
701 long maxiterations = count*10;
702
703 /* We pass two pointers to the callback: the list to which it will
704 * add new elements, and the object containing the dictionary so that
705 * it is possible to fetch more data in a type-dependent way. */
706 privdata[0] = keys;
707 privdata[1] = o;
708 do {
709 cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
710 } while (cursor &&
711 maxiterations-- &&
712 listLength(keys) < (unsigned long)count);
713 } else if (o->type == OBJ_SET) {
714 int pos = 0;
715 int64_t ll;
716
717 while(intsetGet(o->ptr,pos++,&ll))
718 listAddNodeTail(keys,createStringObjectFromLongLong(ll));
719 cursor = 0;
720 } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
721 unsigned char *p = ziplistIndex(o->ptr,0);
722 unsigned char *vstr;
723 unsigned int vlen;
724 long long vll;
725
726 while(p) {
727 ziplistGet(p,&vstr,&vlen,&vll);
728 listAddNodeTail(keys,
729 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
730 createStringObjectFromLongLong(vll));
731 p = ziplistNext(o->ptr,p);
732 }
733 cursor = 0;
734 } else {
735 serverPanic("Not handled encoding in SCAN.");
736 }
737
738 /* Step 3: Filter elements. */
739 node = listFirst(keys);
740 while (node) {
741 robj *kobj = listNodeValue(node);
742 nextnode = listNextNode(node);
743 int filter = 0;
744
745 /* Filter element if it does not match the pattern. */
746 if (!filter && use_pattern) {
747 if (sdsEncodedObject(kobj)) {
748 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
749 filter = 1;
750 } else {
751 char buf[LONG_STR_SIZE];
752 int len;
753
754 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
755 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
756 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
757 }
758 }
759
760 /* Filter element if it is an expired key. */
761 if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
762
763 /* Remove the element and its associted value if needed. */
764 if (filter) {
765 decrRefCount(kobj);
766 listDelNode(keys, node);
767 }
768
769 /* If this is a hash or a sorted set, we have a flat list of
770 * key-value elements, so if this element was filtered, remove the
771 * value, or skip it if it was not filtered: we only match keys. */
772 if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
773 node = nextnode;
774 nextnode = listNextNode(node);
775 if (filter) {
776 kobj = listNodeValue(node);
777 decrRefCount(kobj);
778 listDelNode(keys, node);
779 }
780 }
781 node = nextnode;
782 }
783
784 /* Step 4: Reply to the client. */
785 addReplyMultiBulkLen(c, 2);
786 addReplyBulkLongLong(c,cursor);
787
788 addReplyMultiBulkLen(c, listLength(keys));
789 while ((node = listFirst(keys)) != NULL) {
790 robj *kobj = listNodeValue(node);
791 addReplyBulk(c, kobj);
792 decrRefCount(kobj);
793 listDelNode(keys, node);
794 }
795
796 cleanup:
797 listSetFreeMethod(keys,decrRefCountVoid);
798 listRelease(keys);
799 }
800
801 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)802 void scanCommand(client *c) {
803 unsigned long cursor;
804 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
805 scanGenericCommand(c,NULL,cursor);
806 }
807
dbsizeCommand(client * c)808 void dbsizeCommand(client *c) {
809 addReplyLongLong(c,dictSize(c->db->dict));
810 }
811
lastsaveCommand(client * c)812 void lastsaveCommand(client *c) {
813 addReplyLongLong(c,server.lastsave);
814 }
815
typeCommand(client * c)816 void typeCommand(client *c) {
817 robj *o;
818 char *type;
819
820 o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
821 if (o == NULL) {
822 type = "none";
823 } else {
824 switch(o->type) {
825 case OBJ_STRING: type = "string"; break;
826 case OBJ_LIST: type = "list"; break;
827 case OBJ_SET: type = "set"; break;
828 case OBJ_ZSET: type = "zset"; break;
829 case OBJ_HASH: type = "hash"; break;
830 case OBJ_STREAM: type = "stream"; break;
831 case OBJ_MODULE: {
832 moduleValue *mv = o->ptr;
833 type = mv->type->name;
834 }; break;
835 default: type = "unknown"; break;
836 }
837 }
838 addReplyStatus(c,type);
839 }
840
shutdownCommand(client * c)841 void shutdownCommand(client *c) {
842 int flags = 0;
843
844 if (c->argc > 2) {
845 addReply(c,shared.syntaxerr);
846 return;
847 } else if (c->argc == 2) {
848 if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
849 flags |= SHUTDOWN_NOSAVE;
850 } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
851 flags |= SHUTDOWN_SAVE;
852 } else {
853 addReply(c,shared.syntaxerr);
854 return;
855 }
856 }
857 /* When SHUTDOWN is called while the server is loading a dataset in
858 * memory we need to make sure no attempt is performed to save
859 * the dataset on shutdown (otherwise it could overwrite the current DB
860 * with half-read data).
861 *
862 * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
863 if (server.loading || server.sentinel_mode)
864 flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
865 if (prepareForShutdown(flags) == C_OK) exit(0);
866 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
867 }
868
renameGenericCommand(client * c,int nx)869 void renameGenericCommand(client *c, int nx) {
870 robj *o;
871 long long expire;
872 int samekey = 0;
873
874 /* When source and dest key is the same, no operation is performed,
875 * if the key exists, however we still return an error on unexisting key. */
876 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
877
878 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
879 return;
880
881 if (samekey) {
882 addReply(c,nx ? shared.czero : shared.ok);
883 return;
884 }
885
886 incrRefCount(o);
887 expire = getExpire(c->db,c->argv[1]);
888 if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
889 if (nx) {
890 decrRefCount(o);
891 addReply(c,shared.czero);
892 return;
893 }
894 /* Overwrite: delete the old key before creating the new one
895 * with the same name. */
896 dbDelete(c->db,c->argv[2]);
897 }
898 dbAdd(c->db,c->argv[2],o);
899 if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
900 dbDelete(c->db,c->argv[1]);
901 signalModifiedKey(c->db,c->argv[1]);
902 signalModifiedKey(c->db,c->argv[2]);
903 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
904 c->argv[1],c->db->id);
905 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
906 c->argv[2],c->db->id);
907 server.dirty++;
908 addReply(c,nx ? shared.cone : shared.ok);
909 }
910
renameCommand(client * c)911 void renameCommand(client *c) {
912 renameGenericCommand(c,0);
913 }
914
renamenxCommand(client * c)915 void renamenxCommand(client *c) {
916 renameGenericCommand(c,1);
917 }
918
moveCommand(client * c)919 void moveCommand(client *c) {
920 robj *o;
921 redisDb *src, *dst;
922 int srcid;
923 long long dbid, expire;
924
925 if (server.cluster_enabled) {
926 addReplyError(c,"MOVE is not allowed in cluster mode");
927 return;
928 }
929
930 /* Obtain source and target DB pointers */
931 src = c->db;
932 srcid = c->db->id;
933
934 if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
935 dbid < INT_MIN || dbid > INT_MAX ||
936 selectDb(c,dbid) == C_ERR)
937 {
938 addReply(c,shared.outofrangeerr);
939 return;
940 }
941 dst = c->db;
942 selectDb(c,srcid); /* Back to the source DB */
943
944 /* If the user is moving using as target the same
945 * DB as the source DB it is probably an error. */
946 if (src == dst) {
947 addReply(c,shared.sameobjecterr);
948 return;
949 }
950
951 /* Check if the element exists and get a reference */
952 o = lookupKeyWrite(c->db,c->argv[1]);
953 if (!o) {
954 addReply(c,shared.czero);
955 return;
956 }
957 expire = getExpire(c->db,c->argv[1]);
958
959 /* Return zero if the key already exists in the target DB */
960 if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
961 addReply(c,shared.czero);
962 return;
963 }
964 dbAdd(dst,c->argv[1],o);
965 if (expire != -1) setExpire(c,dst,c->argv[1],expire);
966 incrRefCount(o);
967
968 /* OK! key moved, free the entry in the source DB */
969 dbDelete(src,c->argv[1]);
970 server.dirty++;
971 addReply(c,shared.cone);
972 }
973
974 /* Helper function for dbSwapDatabases(): scans the list of keys that have
975 * one or more blocked clients for B[LR]POP or other blocking commands
976 * and signal the keys as ready if they are of the right type. See the comment
977 * where the function is used for more info. */
scanDatabaseForReadyLists(redisDb * db)978 void scanDatabaseForReadyLists(redisDb *db) {
979 dictEntry *de;
980 dictIterator *di = dictGetSafeIterator(db->blocking_keys);
981 while((de = dictNext(di)) != NULL) {
982 robj *key = dictGetKey(de);
983 robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
984 if (value && (value->type == OBJ_LIST ||
985 value->type == OBJ_STREAM ||
986 value->type == OBJ_ZSET))
987 signalKeyAsReady(db, key);
988 }
989 dictReleaseIterator(di);
990 }
991
992 /* Swap two databases at runtime so that all clients will magically see
993 * the new database even if already connected. Note that the client
994 * structure c->db points to a given DB, so we need to be smarter and
995 * swap the underlying referenced structures, otherwise we would need
996 * to fix all the references to the Redis DB structure.
997 *
998 * Returns C_ERR if at least one of the DB ids are out of range, otherwise
999 * C_OK is returned. */
dbSwapDatabases(int id1,int id2)1000 int dbSwapDatabases(int id1, int id2) {
1001 if (id1 < 0 || id1 >= server.dbnum ||
1002 id2 < 0 || id2 >= server.dbnum) return C_ERR;
1003 if (id1 == id2) return C_OK;
1004 redisDb aux = server.db[id1];
1005 redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1006
1007 /* Swap hash tables. Note that we don't swap blocking_keys,
1008 * ready_keys and watched_keys, since we want clients to
1009 * remain in the same DB they were. */
1010 db1->dict = db2->dict;
1011 db1->expires = db2->expires;
1012 db1->avg_ttl = db2->avg_ttl;
1013
1014 db2->dict = aux.dict;
1015 db2->expires = aux.expires;
1016 db2->avg_ttl = aux.avg_ttl;
1017
1018 /* Now we need to handle clients blocked on lists: as an effect
1019 * of swapping the two DBs, a client that was waiting for list
1020 * X in a given DB, may now actually be unblocked if X happens
1021 * to exist in the new version of the DB, after the swap.
1022 *
1023 * However normally we only do this check for efficiency reasons
1024 * in dbAdd() when a list is created. So here we need to rescan
1025 * the list of clients blocked on lists and signal lists as ready
1026 * if needed. */
1027 scanDatabaseForReadyLists(db1);
1028 scanDatabaseForReadyLists(db2);
1029 return C_OK;
1030 }
1031
1032 /* SWAPDB db1 db2 */
swapdbCommand(client * c)1033 void swapdbCommand(client *c) {
1034 long id1, id2;
1035
1036 /* Not allowed in cluster mode: we have just DB 0 there. */
1037 if (server.cluster_enabled) {
1038 addReplyError(c,"SWAPDB is not allowed in cluster mode");
1039 return;
1040 }
1041
1042 /* Get the two DBs indexes. */
1043 if (getLongFromObjectOrReply(c, c->argv[1], &id1,
1044 "invalid first DB index") != C_OK)
1045 return;
1046
1047 if (getLongFromObjectOrReply(c, c->argv[2], &id2,
1048 "invalid second DB index") != C_OK)
1049 return;
1050
1051 /* Swap... */
1052 if (dbSwapDatabases(id1,id2) == C_ERR) {
1053 addReplyError(c,"DB index is out of range");
1054 return;
1055 } else {
1056 server.dirty++;
1057 addReply(c,shared.ok);
1058 }
1059 }
1060
1061 /*-----------------------------------------------------------------------------
1062 * Expires API
1063 *----------------------------------------------------------------------------*/
1064
removeExpire(redisDb * db,robj * key)1065 int removeExpire(redisDb *db, robj *key) {
1066 /* An expire may only be removed if there is a corresponding entry in the
1067 * main dict. Otherwise, the key will never be freed. */
1068 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1069 return dictDelete(db->expires,key->ptr) == DICT_OK;
1070 }
1071
1072 /* Set an expire to the specified key. If the expire is set in the context
1073 * of an user calling a command 'c' is the client, otherwise 'c' is set
1074 * to NULL. The 'when' parameter is the absolute unix time in milliseconds
1075 * after which the key will no longer be considered valid. */
setExpire(client * c,redisDb * db,robj * key,long long when)1076 void setExpire(client *c, redisDb *db, robj *key, long long when) {
1077 dictEntry *kde, *de;
1078
1079 /* Reuse the sds from the main dict in the expire dict */
1080 kde = dictFind(db->dict,key->ptr);
1081 serverAssertWithInfo(NULL,key,kde != NULL);
1082 de = dictAddOrFind(db->expires,dictGetKey(kde));
1083 dictSetSignedIntegerVal(de,when);
1084
1085 int writable_slave = server.masterhost && server.repl_slave_ro == 0;
1086 if (c && writable_slave && !(c->flags & CLIENT_MASTER))
1087 rememberSlaveKeyWithExpire(db,key);
1088 }
1089
1090 /* Return the expire time of the specified key, or -1 if no expire
1091 * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)1092 long long getExpire(redisDb *db, robj *key) {
1093 dictEntry *de;
1094
1095 /* No expire? return ASAP */
1096 if (dictSize(db->expires) == 0 ||
1097 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
1098
1099 /* The entry was found in the expire dict, this means it should also
1100 * be present in the main dict (safety check). */
1101 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1102 return dictGetSignedIntegerVal(de);
1103 }
1104
1105 /* Propagate expires into slaves and the AOF file.
1106 * When a key expires in the master, a DEL operation for this key is sent
1107 * to all the slaves and the AOF file if enabled.
1108 *
1109 * This way the key expiry is centralized in one place, and since both
1110 * AOF and the master->slave link guarantee operation ordering, everything
1111 * will be consistent even if we allow write operations against expiring
1112 * keys. */
propagateExpire(redisDb * db,robj * key,int lazy)1113 void propagateExpire(redisDb *db, robj *key, int lazy) {
1114 robj *argv[2];
1115
1116 argv[0] = lazy ? shared.unlink : shared.del;
1117 argv[1] = key;
1118 incrRefCount(argv[0]);
1119 incrRefCount(argv[1]);
1120
1121 if (server.aof_state != AOF_OFF)
1122 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
1123 replicationFeedSlaves(server.slaves,db->id,argv,2);
1124
1125 decrRefCount(argv[0]);
1126 decrRefCount(argv[1]);
1127 }
1128
1129 /* Check if the key is expired. */
keyIsExpired(redisDb * db,robj * key)1130 int keyIsExpired(redisDb *db, robj *key) {
1131 mstime_t when = getExpire(db,key);
1132
1133 if (when < 0) return 0; /* No expire for this key */
1134
1135 /* Don't expire anything while loading. It will be done later. */
1136 if (server.loading) return 0;
1137
1138 /* If we are in the context of a Lua script, we pretend that time is
1139 * blocked to when the Lua script started. This way a key can expire
1140 * only the first time it is accessed and not in the middle of the
1141 * script execution, making propagation to slaves / AOF consistent.
1142 * See issue #1525 on Github for more information. */
1143 mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
1144
1145 return now > when;
1146 }
1147
1148 /* This function is called when we are going to perform some operation
1149 * in a given key, but such key may be already logically expired even if
1150 * it still exists in the database. The main way this function is called
1151 * is via lookupKey*() family of functions.
1152 *
1153 * The behavior of the function depends on the replication role of the
1154 * instance, because slave instances do not expire keys, they wait
1155 * for DELs from the master for consistency matters. However even
1156 * slaves will try to have a coherent return value for the function,
1157 * so that read commands executed in the slave side will be able to
1158 * behave like if the key is expired even if still present (because the
1159 * master has yet to propagate the DEL).
1160 *
1161 * In masters as a side effect of finding a key which is expired, such
1162 * key will be evicted from the database. Also this may trigger the
1163 * propagation of a DEL/UNLINK command in AOF / replication stream.
1164 *
1165 * The return value of the function is 0 if the key is still valid,
1166 * otherwise the function returns 1 if the key is expired. */
expireIfNeeded(redisDb * db,robj * key)1167 int expireIfNeeded(redisDb *db, robj *key) {
1168 if (!keyIsExpired(db,key)) return 0;
1169
1170 /* If we are running in the context of a slave, instead of
1171 * evicting the expired key from the database, we return ASAP:
1172 * the slave key expiration is controlled by the master that will
1173 * send us synthesized DEL operations for expired keys.
1174 *
1175 * Still we try to return the right information to the caller,
1176 * that is, 0 if we think the key should be still valid, 1 if
1177 * we think the key is expired at this time. */
1178 if (server.masterhost != NULL) return 1;
1179
1180 /* Delete the key */
1181 server.stat_expiredkeys++;
1182 propagateExpire(db,key,server.lazyfree_lazy_expire);
1183 notifyKeyspaceEvent(NOTIFY_EXPIRED,
1184 "expired",key,db->id);
1185 return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
1186 dbSyncDelete(db,key);
1187 }
1188
1189 /* -----------------------------------------------------------------------------
1190 * API to get key arguments from commands
1191 * ---------------------------------------------------------------------------*/
1192
1193 /* The base case is to use the keys position as given in the command table
1194 * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1195 int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
1196 int j, i = 0, last, *keys;
1197 UNUSED(argv);
1198
1199 if (cmd->firstkey == 0) {
1200 *numkeys = 0;
1201 return NULL;
1202 }
1203
1204 last = cmd->lastkey;
1205 if (last < 0) last = argc+last;
1206 keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
1207 for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1208 if (j >= argc) {
1209 /* Modules commands, and standard commands with a not fixed number
1210 * of arguments (negative arity parameter) do not have dispatch
1211 * time arity checks, so we need to handle the case where the user
1212 * passed an invalid number of arguments here. In this case we
1213 * return no keys and expect the command implementation to report
1214 * an arity or syntax error. */
1215 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
1216 zfree(keys);
1217 *numkeys = 0;
1218 return NULL;
1219 } else {
1220 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
1221 }
1222 }
1223 keys[i++] = j;
1224 }
1225 *numkeys = i;
1226 return keys;
1227 }
1228
1229 /* Return all the arguments that are keys in the command passed via argc / argv.
1230 *
1231 * The command returns the positions of all the key arguments inside the array,
1232 * so the actual return value is an heap allocated array of integers. The
1233 * length of the array is returned by reference into *numkeys.
1234 *
1235 * 'cmd' must be point to the corresponding entry into the redisCommand
1236 * table, according to the command name in argv[0].
1237 *
1238 * This function uses the command table if a command-specific helper function
1239 * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1240 int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1241 if (cmd->flags & CMD_MODULE_GETKEYS) {
1242 return moduleGetCommandKeysViaAPI(cmd,argv,argc,numkeys);
1243 } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) {
1244 return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1245 } else {
1246 return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
1247 }
1248 }
1249
1250 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(int * result)1251 void getKeysFreeResult(int *result) {
1252 zfree(result);
1253 }
1254
1255 /* Helper function to extract keys from following commands:
1256 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1257 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1258 int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1259 int i, num, *keys;
1260 UNUSED(cmd);
1261
1262 num = atoi(argv[2]->ptr);
1263 /* Sanity check. Don't return any key if the command is going to
1264 * reply with syntax error. */
1265 if (num < 1 || num > (argc-3)) {
1266 *numkeys = 0;
1267 return NULL;
1268 }
1269
1270 /* Keys in z{union,inter}store come from two places:
1271 * argv[1] = storage key,
1272 * argv[3...n] = keys to intersect */
1273 keys = zmalloc(sizeof(int)*(num+1));
1274
1275 /* Add all key positions for argv[3...n] to keys[] */
1276 for (i = 0; i < num; i++) keys[i] = 3+i;
1277
1278 /* Finally add the argv[1] key position (the storage key target). */
1279 keys[num] = 1;
1280 *numkeys = num+1; /* Total keys = {union,inter} keys + storage key */
1281 return keys;
1282 }
1283
1284 /* Helper function to extract keys from the following commands:
1285 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1286 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1287 int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1288 int i, num, *keys;
1289 UNUSED(cmd);
1290
1291 num = atoi(argv[2]->ptr);
1292 /* Sanity check. Don't return any key if the command is going to
1293 * reply with syntax error. */
1294 if (num <= 0 || num > (argc-3)) {
1295 *numkeys = 0;
1296 return NULL;
1297 }
1298
1299 keys = zmalloc(sizeof(int)*num);
1300 *numkeys = num;
1301
1302 /* Add all key positions for argv[3...n] to keys[] */
1303 for (i = 0; i < num; i++) keys[i] = 3+i;
1304
1305 return keys;
1306 }
1307
1308 /* Helper function to extract keys from the SORT command.
1309 *
1310 * SORT <sort-key> ... STORE <store-key> ...
1311 *
1312 * The first argument of SORT is always a key, however a list of options
1313 * follow in SQL-alike style. Here we parse just the minimum in order to
1314 * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1315 int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1316 int i, j, num, *keys, found_store = 0;
1317 UNUSED(cmd);
1318
1319 num = 0;
1320 keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */
1321
1322 keys[num++] = 1; /* <sort-key> is always present. */
1323
1324 /* Search for STORE option. By default we consider options to don't
1325 * have arguments, so if we find an unknown option name we scan the
1326 * next. However there are options with 1 or 2 arguments, so we
1327 * provide a list here in order to skip the right number of args. */
1328 struct {
1329 char *name;
1330 int skip;
1331 } skiplist[] = {
1332 {"limit", 2},
1333 {"get", 1},
1334 {"by", 1},
1335 {NULL, 0} /* End of elements. */
1336 };
1337
1338 for (i = 2; i < argc; i++) {
1339 for (j = 0; skiplist[j].name != NULL; j++) {
1340 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1341 i += skiplist[j].skip;
1342 break;
1343 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1344 /* Note: we don't increment "num" here and continue the loop
1345 * to be sure to process the *last* "STORE" option if multiple
1346 * ones are provided. This is same behavior as SORT. */
1347 found_store = 1;
1348 keys[num] = i+1; /* <store-key> */
1349 break;
1350 }
1351 }
1352 }
1353 *numkeys = num + found_store;
1354 return keys;
1355 }
1356
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1357 int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1358 int i, num, first, *keys;
1359 UNUSED(cmd);
1360
1361 /* Assume the obvious form. */
1362 first = 3;
1363 num = 1;
1364
1365 /* But check for the extended one with the KEYS option. */
1366 if (argc > 6) {
1367 for (i = 6; i < argc; i++) {
1368 if (!strcasecmp(argv[i]->ptr,"keys") &&
1369 sdslen(argv[3]->ptr) == 0)
1370 {
1371 first = i+1;
1372 num = argc-first;
1373 break;
1374 }
1375 }
1376 }
1377
1378 keys = zmalloc(sizeof(int)*num);
1379 for (i = 0; i < num; i++) keys[i] = first+i;
1380 *numkeys = num;
1381 return keys;
1382 }
1383
1384 /* Helper function to extract keys from following commands:
1385 * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
1386 * [COUNT count] [STORE key] [STOREDIST key]
1387 * GEORADIUSBYMEMBER key member radius unit ... options ... */
georadiusGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1388 int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1389 int i, num, *keys;
1390 UNUSED(cmd);
1391
1392 /* Check for the presence of the stored key in the command */
1393 int stored_key = -1;
1394 for (i = 5; i < argc; i++) {
1395 char *arg = argv[i]->ptr;
1396 /* For the case when user specifies both "store" and "storedist" options, the
1397 * second key specified would override the first key. This behavior is kept
1398 * the same as in georadiusCommand method.
1399 */
1400 if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
1401 stored_key = i+1;
1402 i++;
1403 }
1404 }
1405 num = 1 + (stored_key == -1 ? 0 : 1);
1406
1407 /* Keys in the command come from two places:
1408 * argv[1] = key,
1409 * argv[5...n] = stored key if present
1410 */
1411 keys = zmalloc(sizeof(int) * num);
1412
1413 /* Add all key positions to keys[] */
1414 keys[0] = 1;
1415 if(num > 1) {
1416 keys[1] = stored_key;
1417 }
1418 *numkeys = num;
1419 return keys;
1420 }
1421
1422 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
1423 * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
xreadGetKeys(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)1424 int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1425 int i, num = 0, *keys;
1426 UNUSED(cmd);
1427
1428 /* We need to parse the options of the command in order to seek the first
1429 * "STREAMS" string which is actually the option. This is needed because
1430 * "STREAMS" could also be the name of the consumer group and even the
1431 * name of the stream key. */
1432 int streams_pos = -1;
1433 for (i = 1; i < argc; i++) {
1434 char *arg = argv[i]->ptr;
1435 if (!strcasecmp(arg, "block")) {
1436 i++; /* Skip option argument. */
1437 } else if (!strcasecmp(arg, "count")) {
1438 i++; /* Skip option argument. */
1439 } else if (!strcasecmp(arg, "group")) {
1440 i += 2; /* Skip option argument. */
1441 } else if (!strcasecmp(arg, "noack")) {
1442 /* Nothing to do. */
1443 } else if (!strcasecmp(arg, "streams")) {
1444 streams_pos = i;
1445 break;
1446 } else {
1447 break; /* Syntax error. */
1448 }
1449 }
1450 if (streams_pos != -1) num = argc - streams_pos - 1;
1451
1452 /* Syntax error. */
1453 if (streams_pos == -1 || num == 0 || num % 2 != 0) {
1454 *numkeys = 0;
1455 return NULL;
1456 }
1457 num /= 2; /* We have half the keys as there are arguments because
1458 there are also the IDs, one per key. */
1459
1460 keys = zmalloc(sizeof(int) * num);
1461 for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
1462 *numkeys = num;
1463 return keys;
1464 }
1465
1466 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1467 * a fast way a key that belongs to a specified hash slot. This is useful
1468 * while rehashing the cluster and in other conditions when we need to
1469 * understand if we have keys for a given hash slot. */
slotToKeyUpdateKey(robj * key,int add)1470 void slotToKeyUpdateKey(robj *key, int add) {
1471 unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1472 unsigned char buf[64];
1473 unsigned char *indexed = buf;
1474 size_t keylen = sdslen(key->ptr);
1475
1476 server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
1477 if (keylen+2 > 64) indexed = zmalloc(keylen+2);
1478 indexed[0] = (hashslot >> 8) & 0xff;
1479 indexed[1] = hashslot & 0xff;
1480 memcpy(indexed+2,key->ptr,keylen);
1481 if (add) {
1482 raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
1483 } else {
1484 raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
1485 }
1486 if (indexed != buf) zfree(indexed);
1487 }
1488
slotToKeyAdd(robj * key)1489 void slotToKeyAdd(robj *key) {
1490 slotToKeyUpdateKey(key,1);
1491 }
1492
slotToKeyDel(robj * key)1493 void slotToKeyDel(robj *key) {
1494 slotToKeyUpdateKey(key,0);
1495 }
1496
slotToKeyFlush(void)1497 void slotToKeyFlush(void) {
1498 raxFree(server.cluster->slots_to_keys);
1499 server.cluster->slots_to_keys = raxNew();
1500 memset(server.cluster->slots_keys_count,0,
1501 sizeof(server.cluster->slots_keys_count));
1502 }
1503
1504 /* Pupulate the specified array of objects with keys in the specified slot.
1505 * New objects are returned to represent keys, it's up to the caller to
1506 * decrement the reference count to release the keys names. */
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1507 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1508 raxIterator iter;
1509 int j = 0;
1510 unsigned char indexed[2];
1511
1512 indexed[0] = (hashslot >> 8) & 0xff;
1513 indexed[1] = hashslot & 0xff;
1514 raxStart(&iter,server.cluster->slots_to_keys);
1515 raxSeek(&iter,">=",indexed,2);
1516 while(count-- && raxNext(&iter)) {
1517 if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
1518 keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
1519 }
1520 raxStop(&iter);
1521 return j;
1522 }
1523
1524 /* Remove all the keys in the specified hash slot.
1525 * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1526 unsigned int delKeysInSlot(unsigned int hashslot) {
1527 raxIterator iter;
1528 int j = 0;
1529 unsigned char indexed[2];
1530
1531 indexed[0] = (hashslot >> 8) & 0xff;
1532 indexed[1] = hashslot & 0xff;
1533 raxStart(&iter,server.cluster->slots_to_keys);
1534 while(server.cluster->slots_keys_count[hashslot]) {
1535 raxSeek(&iter,">=",indexed,2);
1536 raxNext(&iter);
1537
1538 robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
1539 dbDelete(&server.db[0],key);
1540 decrRefCount(key);
1541 j++;
1542 }
1543 raxStop(&iter);
1544 return j;
1545 }
1546
countKeysInSlot(unsigned int hashslot)1547 unsigned int countKeysInSlot(unsigned int hashslot) {
1548 return server.cluster->slots_keys_count[hashslot];
1549 }
1550