xref: /f-stack/app/redis-5.0.5/src/module.c (revision 572c4311)
1 /*
2  * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include <dlfcn.h>
33 
34 #define REDISMODULE_CORE 1
35 #include "redismodule.h"
36 
37 /* --------------------------------------------------------------------------
38  * Private data structures used by the modules system. Those are data
39  * structures that are never exposed to Redis Modules, if not as void
40  * pointers that have an API the module can call with them)
41  * -------------------------------------------------------------------------- */
42 
43 /* This structure represents a module inside the system. */
44 struct RedisModule {
45     void *handle;   /* Module dlopen() handle. */
46     char *name;     /* Module name. */
47     int ver;        /* Module version. We use just progressive integers. */
48     int apiver;     /* Module API version as requested during initialization.*/
49     list *types;    /* Module data types. */
50     list *usedby;   /* List of modules using APIs from this one. */
51     list *using;    /* List of modules we use some APIs of. */
52     list *filters;  /* List of filters the module has registered. */
53     int in_call;    /* RM_Call() nesting level */
54 };
55 typedef struct RedisModule RedisModule;
56 
57 /* This represents a shared API. Shared APIs will be used to populate
58  * the server.sharedapi dictionary, mapping names of APIs exported by
59  * modules for other modules to use, to their structure specifying the
60  * function pointer that can be called. */
61 struct RedisModuleSharedAPI {
62     void *func;
63     RedisModule *module;
64 };
65 typedef struct RedisModuleSharedAPI RedisModuleSharedAPI;
66 
67 static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
68 
69 /* Entries in the context->amqueue array, representing objects to free
70  * when the callback returns. */
71 struct AutoMemEntry {
72     void *ptr;
73     int type;
74 };
75 
76 /* AutMemEntry type field values. */
77 #define REDISMODULE_AM_KEY 0
78 #define REDISMODULE_AM_STRING 1
79 #define REDISMODULE_AM_REPLY 2
80 #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
81 #define REDISMODULE_AM_DICT 4
82 
83 /* The pool allocator block. Redis Modules can allocate memory via this special
84  * allocator that will automatically release it all once the callback returns.
85  * This means that it can only be used for ephemeral allocations. However
86  * there are two advantages for modules to use this API:
87  *
88  * 1) The memory is automatically released when the callback returns.
89  * 2) This allocator is faster for many small allocations since whole blocks
90  *    are allocated, and small pieces returned to the caller just advancing
91  *    the index of the allocation.
92  *
93  * Allocations are always rounded to the size of the void pointer in order
94  * to always return aligned memory chunks. */
95 
96 #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8)
97 #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*))
98 
99 typedef struct RedisModulePoolAllocBlock {
100     uint32_t size;
101     uint32_t used;
102     struct RedisModulePoolAllocBlock *next;
103     char memory[];
104 } RedisModulePoolAllocBlock;
105 
106 /* This structure represents the context in which Redis modules operate.
107  * Most APIs module can access, get a pointer to the context, so that the API
108  * implementation can hold state across calls, or remember what to free after
109  * the call and so forth.
110  *
111  * Note that not all the context structure is always filled with actual values
112  * but only the fields needed in a given context. */
113 
114 struct RedisModuleBlockedClient;
115 
116 struct RedisModuleCtx {
117     void *getapifuncptr;            /* NOTE: Must be the first field. */
118     struct RedisModule *module;     /* Module reference. */
119     client *client;                 /* Client calling a command. */
120     struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
121                                                         thread safe context. */
122     struct AutoMemEntry *amqueue;   /* Auto memory queue of objects to free. */
123     int amqueue_len;                /* Number of slots in amqueue. */
124     int amqueue_used;               /* Number of used slots in amqueue. */
125     int flags;                      /* REDISMODULE_CTX_... flags. */
126     void **postponed_arrays;        /* To set with RM_ReplySetArrayLength(). */
127     int postponed_arrays_count;     /* Number of entries in postponed_arrays. */
128     void *blocked_privdata;         /* Privdata set when unblocking a client. */
129 
130     /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
131     int *keys_pos;
132     int keys_count;
133 
134     struct RedisModulePoolAllocBlock *pa_head;
135 };
136 typedef struct RedisModuleCtx RedisModuleCtx;
137 
138 #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
139 #define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
140 #define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
141 #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
142 #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
143 #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
144 #define REDISMODULE_CTX_THREAD_SAFE (1<<5)
145 #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
146 
147 /* This represents a Redis key opened with RM_OpenKey(). */
148 struct RedisModuleKey {
149     RedisModuleCtx *ctx;
150     redisDb *db;
151     robj *key;      /* Key name object. */
152     robj *value;    /* Value object, or NULL if the key was not found. */
153     void *iter;     /* Iterator. */
154     int mode;       /* Opening mode. */
155 
156     /* Zset iterator. */
157     uint32_t ztype;         /* REDISMODULE_ZSET_RANGE_* */
158     zrangespec zrs;         /* Score range. */
159     zlexrangespec zlrs;     /* Lex range. */
160     uint32_t zstart;        /* Start pos for positional ranges. */
161     uint32_t zend;          /* End pos for positional ranges. */
162     void *zcurrent;         /* Zset iterator current node. */
163     int zer;                /* Zset iterator end reached flag
164                                (true if end was reached). */
165 };
166 typedef struct RedisModuleKey RedisModuleKey;
167 
168 /* RedisModuleKey 'ztype' values. */
169 #define REDISMODULE_ZSET_RANGE_NONE 0       /* This must always be 0. */
170 #define REDISMODULE_ZSET_RANGE_LEX 1
171 #define REDISMODULE_ZSET_RANGE_SCORE 2
172 #define REDISMODULE_ZSET_RANGE_POS 3
173 
174 /* Function pointer type of a function representing a command inside
175  * a Redis module. */
176 struct RedisModuleBlockedClient;
177 typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
178 typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
179 
180 /* This struct holds the information about a command registered by a module.*/
181 struct RedisModuleCommandProxy {
182     struct RedisModule *module;
183     RedisModuleCmdFunc func;
184     struct redisCommand *rediscmd;
185 };
186 typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
187 
188 #define REDISMODULE_REPLYFLAG_NONE 0
189 #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
190 #define REDISMODULE_REPLYFLAG_NESTED (1<<1)  /* Nested reply object. No proto
191                                                 or struct free. */
192 
193 /* Reply of RM_Call() function. The function is filled in a lazy
194  * way depending on the function called on the reply structure. By default
195  * only the type, proto and protolen are filled. */
196 typedef struct RedisModuleCallReply {
197     RedisModuleCtx *ctx;
198     int type;       /* REDISMODULE_REPLY_... */
199     int flags;      /* REDISMODULE_REPLYFLAG_...  */
200     size_t len;     /* Len of strings or num of elements of arrays. */
201     char *proto;    /* Raw reply protocol. An SDS string at top-level object. */
202     size_t protolen;/* Length of protocol. */
203     union {
204         const char *str; /* String pointer for string and error replies. This
205                             does not need to be freed, always points inside
206                             a reply->proto buffer of the reply object or, in
207                             case of array elements, of parent reply objects. */
208         long long ll;    /* Reply value for integer reply. */
209         struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
210     } val;
211 } RedisModuleCallReply;
212 
213 /* Structure representing a blocked client. We get a pointer to such
214  * an object when blocking from modules. */
215 typedef struct RedisModuleBlockedClient {
216     client *client;  /* Pointer to the blocked client. or NULL if the client
217                         was destroyed during the life of this object. */
218     RedisModule *module;    /* Module blocking the client. */
219     RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
220     RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
221     RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
222     void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
223     void *privdata;     /* Module private data that may be used by the reply
224                            or timeout callback. It is set via the
225                            RedisModule_UnblockClient() API. */
226     client *reply_client;           /* Fake client used to accumulate replies
227                                        in thread safe contexts. */
228     int dbid;           /* Database number selected by the original client. */
229 } RedisModuleBlockedClient;
230 
231 static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
232 static list *moduleUnblockedClients;
233 
234 /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
235  * allow thread safe contexts to execute commands at a safe moment. */
236 static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
237 
238 
239 /* Function pointer type for keyspace event notification subscriptions from modules. */
240 typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
241 
242 /* Keyspace notification subscriber information.
243  * See RM_SubscribeToKeyspaceEvents() for more information. */
244 typedef struct RedisModuleKeyspaceSubscriber {
245     /* The module subscribed to the event */
246     RedisModule *module;
247     /* Notification callback in the module*/
248     RedisModuleNotificationFunc notify_callback;
249     /* A bit mask of the events the module is interested in */
250     int event_mask;
251     /* Active flag set on entry, to avoid reentrant subscribers
252      * calling themselves */
253     int active;
254 } RedisModuleKeyspaceSubscriber;
255 
256 /* The module keyspace notification subscribers list */
257 static list *moduleKeyspaceSubscribers;
258 
259 /* Static client recycled for when we need to provide a context with a client
260  * in a situation where there is no client to provide. This avoidsallocating
261  * a new client per round. For instance this is used in the keyspace
262  * notifications, timers and cluster messages callbacks. */
263 static client *moduleFreeContextReusedClient;
264 
265 /* Data structures related to the exported dictionary data structure. */
266 typedef struct RedisModuleDict {
267     rax *rax;                       /* The radix tree. */
268 } RedisModuleDict;
269 
270 typedef struct RedisModuleDictIter {
271     RedisModuleDict *dict;
272     raxIterator ri;
273 } RedisModuleDictIter;
274 
275 typedef struct RedisModuleCommandFilterCtx {
276     RedisModuleString **argv;
277     int argc;
278 } RedisModuleCommandFilterCtx;
279 
280 typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
281 
282 typedef struct RedisModuleCommandFilter {
283     /* The module that registered the filter */
284     RedisModule *module;
285     /* Filter callback function */
286     RedisModuleCommandFilterFunc callback;
287     /* REDISMODULE_CMDFILTER_* flags */
288     int flags;
289 } RedisModuleCommandFilter;
290 
291 /* Registered filters */
292 static list *moduleCommandFilters;
293 
294 /* --------------------------------------------------------------------------
295  * Prototypes
296  * -------------------------------------------------------------------------- */
297 
298 void RM_FreeCallReply(RedisModuleCallReply *reply);
299 void RM_CloseKey(RedisModuleKey *key);
300 void autoMemoryCollect(RedisModuleCtx *ctx);
301 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
302 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
303 void RM_ZsetRangeStop(RedisModuleKey *kp);
304 static void zsetKeyReset(RedisModuleKey *key);
305 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
306 
307 /* --------------------------------------------------------------------------
308  * Heap allocation raw functions
309  * -------------------------------------------------------------------------- */
310 
311 /* Use like malloc(). Memory allocated with this function is reported in
312  * Redis INFO memory, used for keys eviction according to maxmemory settings
313  * and in general is taken into account as memory allocated by Redis.
314  * You should avoid using malloc(). */
RM_Alloc(size_t bytes)315 void *RM_Alloc(size_t bytes) {
316     return zmalloc(bytes);
317 }
318 
319 /* Use like calloc(). Memory allocated with this function is reported in
320  * Redis INFO memory, used for keys eviction according to maxmemory settings
321  * and in general is taken into account as memory allocated by Redis.
322  * You should avoid using calloc() directly. */
RM_Calloc(size_t nmemb,size_t size)323 void *RM_Calloc(size_t nmemb, size_t size) {
324     return zcalloc(nmemb*size);
325 }
326 
327 /* Use like realloc() for memory obtained with RedisModule_Alloc(). */
RM_Realloc(void * ptr,size_t bytes)328 void* RM_Realloc(void *ptr, size_t bytes) {
329     return zrealloc(ptr,bytes);
330 }
331 
332 /* Use like free() for memory obtained by RedisModule_Alloc() and
333  * RedisModule_Realloc(). However you should never try to free with
334  * RedisModule_Free() memory allocated with malloc() inside your module. */
RM_Free(void * ptr)335 void RM_Free(void *ptr) {
336     zfree(ptr);
337 }
338 
339 /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */
RM_Strdup(const char * str)340 char *RM_Strdup(const char *str) {
341     return zstrdup(str);
342 }
343 
344 /* --------------------------------------------------------------------------
345  * Pool allocator
346  * -------------------------------------------------------------------------- */
347 
348 /* Release the chain of blocks used for pool allocations. */
poolAllocRelease(RedisModuleCtx * ctx)349 void poolAllocRelease(RedisModuleCtx *ctx) {
350     RedisModulePoolAllocBlock *head = ctx->pa_head, *next;
351 
352     while(head != NULL) {
353         next = head->next;
354         zfree(head);
355         head = next;
356     }
357     ctx->pa_head = NULL;
358 }
359 
360 /* Return heap allocated memory that will be freed automatically when the
361  * module callback function returns. Mostly suitable for small allocations
362  * that are short living and must be released when the callback returns
363  * anyway. The returned memory is aligned to the architecture word size
364  * if at least word size bytes are requested, otherwise it is just
365  * aligned to the next power of two, so for example a 3 bytes request is
366  * 4 bytes aligned while a 2 bytes request is 2 bytes aligned.
367  *
368  * There is no realloc style function since when this is needed to use the
369  * pool allocator is not a good idea.
370  *
371  * The function returns NULL if `bytes` is 0. */
RM_PoolAlloc(RedisModuleCtx * ctx,size_t bytes)372 void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
373     if (bytes == 0) return NULL;
374     RedisModulePoolAllocBlock *b = ctx->pa_head;
375     size_t left = b ? b->size - b->used : 0;
376 
377     /* Fix alignment. */
378     if (left >= bytes) {
379         size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN;
380         while (bytes < alignment && alignment/2 >= bytes) alignment /= 2;
381         if (b->used % alignment)
382             b->used += alignment - (b->used % alignment);
383         left = (b->used > b->size) ? 0 : b->size - b->used;
384     }
385 
386     /* Create a new block if needed. */
387     if (left < bytes) {
388         size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE;
389         if (blocksize < bytes) blocksize = bytes;
390         b = zmalloc(sizeof(*b) + blocksize);
391         b->size = blocksize;
392         b->used = 0;
393         b->next = ctx->pa_head;
394         ctx->pa_head = b;
395     }
396 
397     char *retval = b->memory + b->used;
398     b->used += bytes;
399     return retval;
400 }
401 
402 /* --------------------------------------------------------------------------
403  * Helpers for modules API implementation
404  * -------------------------------------------------------------------------- */
405 
406 /* Create an empty key of the specified type. 'kp' must point to a key object
407  * opened for writing where the .value member is set to NULL because the
408  * key was found to be non existing.
409  *
410  * On success REDISMODULE_OK is returned and the key is populated with
411  * the value of the specified type. The function fails and returns
412  * REDISMODULE_ERR if:
413  *
414  * 1) The key is not open for writing.
415  * 2) The key is not empty.
416  * 3) The specified type is unknown.
417  */
moduleCreateEmptyKey(RedisModuleKey * key,int type)418 int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
419     robj *obj;
420 
421     /* The key must be open for writing and non existing to proceed. */
422     if (!(key->mode & REDISMODULE_WRITE) || key->value)
423         return REDISMODULE_ERR;
424 
425     switch(type) {
426     case REDISMODULE_KEYTYPE_LIST:
427         obj = createQuicklistObject();
428         quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
429                             server.list_compress_depth);
430         break;
431     case REDISMODULE_KEYTYPE_ZSET:
432         obj = createZsetZiplistObject();
433         break;
434     case REDISMODULE_KEYTYPE_HASH:
435         obj = createHashObject();
436         break;
437     default: return REDISMODULE_ERR;
438     }
439     dbAdd(key->db,key->key,obj);
440     key->value = obj;
441     return REDISMODULE_OK;
442 }
443 
444 /* This function is called in low-level API implementation functions in order
445  * to check if the value associated with the key remained empty after an
446  * operation that removed elements from an aggregate data type.
447  *
448  * If this happens, the key is deleted from the DB and the key object state
449  * is set to the right one in order to be targeted again by write operations
450  * possibly recreating the key if needed.
451  *
452  * The function returns 1 if the key value object is found empty and is
453  * deleted, otherwise 0 is returned. */
moduleDelKeyIfEmpty(RedisModuleKey * key)454 int moduleDelKeyIfEmpty(RedisModuleKey *key) {
455     if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
456     int isempty;
457     robj *o = key->value;
458 
459     switch(o->type) {
460     case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
461     case OBJ_SET: isempty = setTypeSize(o) == 0; break;
462     case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
463     case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
464     default: isempty = 0;
465     }
466 
467     if (isempty) {
468         dbDelete(key->db,key->key);
469         key->value = NULL;
470         return 1;
471     } else {
472         return 0;
473     }
474 }
475 
476 /* --------------------------------------------------------------------------
477  * Service API exported to modules
478  *
479  * Note that all the exported APIs are called RM_<funcname> in the core
480  * and RedisModule_<funcname> in the module side (defined as function
481  * pointers in redismodule.h). In this way the dynamic linker does not
482  * mess with our global function pointers, overriding it with the symbols
483  * defined in the main executable having the same names.
484  * -------------------------------------------------------------------------- */
485 
486 /* Lookup the requested module API and store the function pointer into the
487  * target pointer. The function returns REDISMODULE_ERR if there is no such
488  * named API, otherwise REDISMODULE_OK.
489  *
490  * This function is not meant to be used by modules developer, it is only
491  * used implicitly by including redismodule.h. */
RM_GetApi(const char * funcname,void ** targetPtrPtr)492 int RM_GetApi(const char *funcname, void **targetPtrPtr) {
493     dictEntry *he = dictFind(server.moduleapi, funcname);
494     if (!he) return REDISMODULE_ERR;
495     *targetPtrPtr = dictGetVal(he);
496     return REDISMODULE_OK;
497 }
498 
499 /* Free the context after the user function was called. */
moduleFreeContext(RedisModuleCtx * ctx)500 void moduleFreeContext(RedisModuleCtx *ctx) {
501     autoMemoryCollect(ctx);
502     poolAllocRelease(ctx);
503     if (ctx->postponed_arrays) {
504         zfree(ctx->postponed_arrays);
505         ctx->postponed_arrays_count = 0;
506         serverLog(LL_WARNING,
507             "API misuse detected in module %s: "
508             "RedisModule_ReplyWithArray(REDISMODULE_POSTPONED_ARRAY_LEN) "
509             "not matched by the same number of RedisModule_SetReplyArrayLen() "
510             "calls.",
511             ctx->module->name);
512     }
513     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
514 }
515 
516 /* Helper function for when a command callback is called, in order to handle
517  * details needed to correctly replicate commands. */
moduleHandlePropagationAfterCommandCallback(RedisModuleCtx * ctx)518 void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
519     client *c = ctx->client;
520 
521     if (c->flags & CLIENT_LUA) return;
522 
523     /* Handle the replication of the final EXEC, since whatever a command
524      * emits is always wrapped around MULTI/EXEC. */
525     if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
526         robj *propargv[1];
527         propargv[0] = createStringObject("EXEC",4);
528         alsoPropagate(server.execCommand,c->db->id,propargv,1,
529             PROPAGATE_AOF|PROPAGATE_REPL);
530         decrRefCount(propargv[0]);
531     }
532 }
533 
534 /* This Redis command binds the normal Redis command invocation with commands
535  * exported by modules. */
RedisModuleCommandDispatcher(client * c)536 void RedisModuleCommandDispatcher(client *c) {
537     RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
538     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
539 
540     ctx.module = cp->module;
541     ctx.client = c;
542     cp->func(&ctx,(void**)c->argv,c->argc);
543     moduleHandlePropagationAfterCommandCallback(&ctx);
544     moduleFreeContext(&ctx);
545 
546     /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
547      * expand the query buffer, and in order to avoid a big object copy
548      * the query buffer SDS may be used directly as the SDS string backing
549      * the client argument vectors: sometimes this will result in the SDS
550      * string having unused space at the end. Later if a module takes ownership
551      * of the RedisString, such space will be wasted forever. Inside the
552      * Redis core this is not a problem because tryObjectEncoding() is called
553      * before storing strings in the key space. Here we need to do it
554      * for the module. */
555     for (int i = 0; i < c->argc; i++) {
556         /* Only do the work if the module took ownership of the object:
557          * in that case the refcount is no longer 1. */
558         if (c->argv[i]->refcount > 1)
559             trimStringObjectIfNeeded(c->argv[i]);
560     }
561 }
562 
563 /* This function returns the list of keys, with the same interface as the
564  * 'getkeys' function of the native commands, for module commands that exported
565  * the "getkeys-api" flag during the registration. This is done when the
566  * list of keys are not at fixed positions, so that first/last/step cannot
567  * be used.
568  *
569  * In order to accomplish its work, the module command is called, flagging
570  * the context in a way that the command can recognize this is a special
571  * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */
moduleGetCommandKeysViaAPI(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)572 int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
573     RedisModuleCommandProxy *cp = (void*)(unsigned long)cmd->getkeys_proc;
574     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
575 
576     ctx.module = cp->module;
577     ctx.client = NULL;
578     ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST;
579     cp->func(&ctx,(void**)argv,argc);
580     int *res = ctx.keys_pos;
581     if (numkeys) *numkeys = ctx.keys_count;
582     moduleFreeContext(&ctx);
583     return res;
584 }
585 
586 /* Return non-zero if a module command, that was declared with the
587  * flag "getkeys-api", is called in a special way to get the keys positions
588  * and not to get executed. Otherwise zero is returned. */
RM_IsKeysPositionRequest(RedisModuleCtx * ctx)589 int RM_IsKeysPositionRequest(RedisModuleCtx *ctx) {
590     return (ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST) != 0;
591 }
592 
593 /* When a module command is called in order to obtain the position of
594  * keys, since it was flagged as "getkeys-api" during the registration,
595  * the command implementation checks for this special call using the
596  * RedisModule_IsKeysPositionRequest() API and uses this function in
597  * order to report keys, like in the following example:
598  *
599  *     if (RedisModule_IsKeysPositionRequest(ctx)) {
600  *         RedisModule_KeyAtPos(ctx,1);
601  *         RedisModule_KeyAtPos(ctx,2);
602  *     }
603  *
604  *  Note: in the example below the get keys API would not be needed since
605  *  keys are at fixed positions. This interface is only used for commands
606  *  with a more complex structure. */
RM_KeyAtPos(RedisModuleCtx * ctx,int pos)607 void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
608     if (!(ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST)) return;
609     if (pos <= 0) return;
610     ctx->keys_pos = zrealloc(ctx->keys_pos,sizeof(int)*(ctx->keys_count+1));
611     ctx->keys_pos[ctx->keys_count++] = pos;
612 }
613 
614 /* Helper for RM_CreateCommand(). Turns a string representing command
615  * flags into the command flags used by the Redis core.
616  *
617  * It returns the set of flags, or -1 if unknown flags are found. */
commandFlagsFromString(char * s)618 int commandFlagsFromString(char *s) {
619     int count, j;
620     int flags = 0;
621     sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
622     for (j = 0; j < count; j++) {
623         char *t = tokens[j];
624         if (!strcasecmp(t,"write")) flags |= CMD_WRITE;
625         else if (!strcasecmp(t,"readonly")) flags |= CMD_READONLY;
626         else if (!strcasecmp(t,"admin")) flags |= CMD_ADMIN;
627         else if (!strcasecmp(t,"deny-oom")) flags |= CMD_DENYOOM;
628         else if (!strcasecmp(t,"deny-script")) flags |= CMD_NOSCRIPT;
629         else if (!strcasecmp(t,"allow-loading")) flags |= CMD_LOADING;
630         else if (!strcasecmp(t,"pubsub")) flags |= CMD_PUBSUB;
631         else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
632         else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
633         else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
634         else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
635         else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
636         else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
637         else break;
638     }
639     sdsfreesplitres(tokens,count);
640     if (j != count) return -1; /* Some token not processed correctly. */
641     return flags;
642 }
643 
644 /* Register a new command in the Redis server, that will be handled by
645  * calling the function pointer 'func' using the RedisModule calling
646  * convention. The function returns REDISMODULE_ERR if the specified command
647  * name is already busy or a set of invalid flags were passed, otherwise
648  * REDISMODULE_OK is returned and the new command is registered.
649  *
650  * This function must be called during the initialization of the module
651  * inside the RedisModule_OnLoad() function. Calling this function outside
652  * of the initialization function is not defined.
653  *
654  * The command function type is the following:
655  *
656  *      int MyCommand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
657  *
658  * And is supposed to always return REDISMODULE_OK.
659  *
660  * The set of flags 'strflags' specify the behavior of the command, and should
661  * be passed as a C string composed of space separated words, like for
662  * example "write deny-oom". The set of flags are:
663  *
664  * * **"write"**:     The command may modify the data set (it may also read
665  *                    from it).
666  * * **"readonly"**:  The command returns data from keys but never writes.
667  * * **"admin"**:     The command is an administrative command (may change
668  *                    replication or perform similar tasks).
669  * * **"deny-oom"**:  The command may use additional memory and should be
670  *                    denied during out of memory conditions.
671  * * **"deny-script"**:   Don't allow this command in Lua scripts.
672  * * **"allow-loading"**: Allow this command while the server is loading data.
673  *                        Only commands not interacting with the data set
674  *                        should be allowed to run in this mode. If not sure
675  *                        don't use this flag.
676  * * **"pubsub"**:    The command publishes things on Pub/Sub channels.
677  * * **"random"**:    The command may have different outputs even starting
678  *                    from the same input arguments and key values.
679  * * **"allow-stale"**: The command is allowed to run on slaves that don't
680  *                      serve stale data. Don't use if you don't know what
681  *                      this means.
682  * * **"no-monitor"**: Don't propagate the command on monitor. Use this if
683  *                     the command has sensible data among the arguments.
684  * * **"fast"**:      The command time complexity is not greater
685  *                    than O(log(N)) where N is the size of the collection or
686  *                    anything else representing the normal scalability
687  *                    issue with the command.
688  * * **"getkeys-api"**: The command implements the interface to return
689  *                      the arguments that are keys. Used when start/stop/step
690  *                      is not enough because of the command syntax.
691  * * **"no-cluster"**: The command should not register in Redis Cluster
692  *                     since is not designed to work with it because, for
693  *                     example, is unable to report the position of the
694  *                     keys, programmatically creates key names, or any
695  *                     other reason.
696  */
RM_CreateCommand(RedisModuleCtx * ctx,const char * name,RedisModuleCmdFunc cmdfunc,const char * strflags,int firstkey,int lastkey,int keystep)697 int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
698     int flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
699     if (flags == -1) return REDISMODULE_ERR;
700     if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
701         return REDISMODULE_ERR;
702 
703     struct redisCommand *rediscmd;
704     RedisModuleCommandProxy *cp;
705     sds cmdname = sdsnew(name);
706 
707     /* Check if the command name is busy. */
708     if (lookupCommand(cmdname) != NULL) {
709         sdsfree(cmdname);
710         return REDISMODULE_ERR;
711     }
712 
713     /* Create a command "proxy", which is a structure that is referenced
714      * in the command table, so that the generic command that works as
715      * binding between modules and Redis, can know what function to call
716      * and what the module is.
717      *
718      * Note that we use the Redis command table 'getkeys_proc' in order to
719      * pass a reference to the command proxy structure. */
720     cp = zmalloc(sizeof(*cp));
721     cp->module = ctx->module;
722     cp->func = cmdfunc;
723     cp->rediscmd = zmalloc(sizeof(*rediscmd));
724     cp->rediscmd->name = cmdname;
725     cp->rediscmd->proc = RedisModuleCommandDispatcher;
726     cp->rediscmd->arity = -1;
727     cp->rediscmd->flags = flags | CMD_MODULE;
728     cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
729     cp->rediscmd->firstkey = firstkey;
730     cp->rediscmd->lastkey = lastkey;
731     cp->rediscmd->keystep = keystep;
732     cp->rediscmd->microseconds = 0;
733     cp->rediscmd->calls = 0;
734     dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
735     dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
736     return REDISMODULE_OK;
737 }
738 
739 /* Called by RM_Init() to setup the `ctx->module` structure.
740  *
741  * This is an internal function, Redis modules developers don't need
742  * to use it. */
RM_SetModuleAttribs(RedisModuleCtx * ctx,const char * name,int ver,int apiver)743 void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
744     RedisModule *module;
745 
746     if (ctx->module != NULL) return;
747     module = zmalloc(sizeof(*module));
748     module->name = sdsnew((char*)name);
749     module->ver = ver;
750     module->apiver = apiver;
751     module->types = listCreate();
752     module->usedby = listCreate();
753     module->using = listCreate();
754     module->filters = listCreate();
755     module->in_call = 0;
756     ctx->module = module;
757 }
758 
759 /* Return non-zero if the module name is busy.
760  * Otherwise zero is returned. */
RM_IsModuleNameBusy(const char * name)761 int RM_IsModuleNameBusy(const char *name) {
762     sds modulename = sdsnew(name);
763     dictEntry *de = dictFind(modules,modulename);
764     sdsfree(modulename);
765     return de != NULL;
766 }
767 
768 /* Return the current UNIX time in milliseconds. */
RM_Milliseconds(void)769 long long RM_Milliseconds(void) {
770     return mstime();
771 }
772 
773 /* --------------------------------------------------------------------------
774  * Automatic memory management for modules
775  * -------------------------------------------------------------------------- */
776 
777 /* Enable automatic memory management. See API.md for more information.
778  *
779  * The function must be called as the first function of a command implementation
780  * that wants to use automatic memory. */
RM_AutoMemory(RedisModuleCtx * ctx)781 void RM_AutoMemory(RedisModuleCtx *ctx) {
782     ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
783 }
784 
785 /* Add a new object to release automatically when the callback returns. */
autoMemoryAdd(RedisModuleCtx * ctx,int type,void * ptr)786 void autoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) {
787     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
788     if (ctx->amqueue_used == ctx->amqueue_len) {
789         ctx->amqueue_len *= 2;
790         if (ctx->amqueue_len < 16) ctx->amqueue_len = 16;
791         ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len);
792     }
793     ctx->amqueue[ctx->amqueue_used].type = type;
794     ctx->amqueue[ctx->amqueue_used].ptr = ptr;
795     ctx->amqueue_used++;
796 }
797 
798 /* Mark an object as freed in the auto release queue, so that users can still
799  * free things manually if they want.
800  *
801  * The function returns 1 if the object was actually found in the auto memory
802  * pool, otherwise 0 is returned. */
autoMemoryFreed(RedisModuleCtx * ctx,int type,void * ptr)803 int autoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) {
804     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return 0;
805 
806     int count = (ctx->amqueue_used+1)/2;
807     for (int j = 0; j < count; j++) {
808         for (int side = 0; side < 2; side++) {
809             /* For side = 0 check right side of the array, for
810              * side = 1 check the left side instead (zig-zag scanning). */
811             int i = (side == 0) ? (ctx->amqueue_used - 1 - j) : j;
812             if (ctx->amqueue[i].type == type &&
813                 ctx->amqueue[i].ptr == ptr)
814             {
815                 ctx->amqueue[i].type = REDISMODULE_AM_FREED;
816 
817                 /* Switch the freed element and the last element, to avoid growing
818                  * the queue unnecessarily if we allocate/free in a loop */
819                 if (i != ctx->amqueue_used-1) {
820                     ctx->amqueue[i] = ctx->amqueue[ctx->amqueue_used-1];
821                 }
822 
823                 /* Reduce the size of the queue because we either moved the top
824                  * element elsewhere or freed it */
825                 ctx->amqueue_used--;
826                 return 1;
827             }
828         }
829     }
830     return 0;
831 }
832 
833 /* Release all the objects in queue. */
autoMemoryCollect(RedisModuleCtx * ctx)834 void autoMemoryCollect(RedisModuleCtx *ctx) {
835     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
836     /* Clear the AUTO_MEMORY flag from the context, otherwise the functions
837      * we call to free the resources, will try to scan the auto release
838      * queue to mark the entries as freed. */
839     ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY;
840     int j;
841     for (j = 0; j < ctx->amqueue_used; j++) {
842         void *ptr = ctx->amqueue[j].ptr;
843         switch(ctx->amqueue[j].type) {
844         case REDISMODULE_AM_STRING: decrRefCount(ptr); break;
845         case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break;
846         case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break;
847         case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break;
848         }
849     }
850     ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
851     zfree(ctx->amqueue);
852     ctx->amqueue = NULL;
853     ctx->amqueue_len = 0;
854     ctx->amqueue_used = 0;
855 }
856 
857 /* --------------------------------------------------------------------------
858  * String objects APIs
859  * -------------------------------------------------------------------------- */
860 
861 /* Create a new module string object. The returned string must be freed
862  * with RedisModule_FreeString(), unless automatic memory is enabled.
863  *
864  * The string is created by copying the `len` bytes starting
865  * at `ptr`. No reference is retained to the passed buffer.
866  *
867  * The module context 'ctx' is optional and may be NULL if you want to create
868  * a string out of the context scope. However in that case, the automatic
869  * memory management will not be available, and the string memory must be
870  * managed manually. */
RM_CreateString(RedisModuleCtx * ctx,const char * ptr,size_t len)871 RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) {
872     RedisModuleString *o = createStringObject(ptr,len);
873     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
874     return o;
875 }
876 
877 /* Create a new module string object from a printf format and arguments.
878  * The returned string must be freed with RedisModule_FreeString(), unless
879  * automatic memory is enabled.
880  *
881  * The string is created using the sds formatter function sdscatvprintf().
882  *
883  * The passed context 'ctx' may be NULL if necessary, see the
884  * RedisModule_CreateString() documentation for more info. */
RM_CreateStringPrintf(RedisModuleCtx * ctx,const char * fmt,...)885 RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) {
886     sds s = sdsempty();
887 
888     va_list ap;
889     va_start(ap, fmt);
890     s = sdscatvprintf(s, fmt, ap);
891     va_end(ap);
892 
893     RedisModuleString *o = createObject(OBJ_STRING, s);
894     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
895 
896     return o;
897 }
898 
899 
900 /* Like RedisModule_CreatString(), but creates a string starting from a long long
901  * integer instead of taking a buffer and its length.
902  *
903  * The returned string must be released with RedisModule_FreeString() or by
904  * enabling automatic memory management.
905  *
906  * The passed context 'ctx' may be NULL if necessary, see the
907  * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromLongLong(RedisModuleCtx * ctx,long long ll)908 RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) {
909     char buf[LONG_STR_SIZE];
910     size_t len = ll2string(buf,sizeof(buf),ll);
911     return RM_CreateString(ctx,buf,len);
912 }
913 
914 /* Like RedisModule_CreatString(), but creates a string starting from another
915  * RedisModuleString.
916  *
917  * The returned string must be released with RedisModule_FreeString() or by
918  * enabling automatic memory management.
919  *
920  * The passed context 'ctx' may be NULL if necessary, see the
921  * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromString(RedisModuleCtx * ctx,const RedisModuleString * str)922 RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str) {
923     RedisModuleString *o = dupStringObject(str);
924     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
925     return o;
926 }
927 
928 /* Free a module string object obtained with one of the Redis modules API calls
929  * that return new string objects.
930  *
931  * It is possible to call this function even when automatic memory management
932  * is enabled. In that case the string will be released ASAP and removed
933  * from the pool of string to release at the end.
934  *
935  * If the string was created with a NULL context 'ctx', it is also possible to
936  * pass ctx as NULL when releasing the string (but passing a context will not
937  * create any issue). Strings created with a context should be freed also passing
938  * the context, so if you want to free a string out of context later, make sure
939  * to create it using a NULL context. */
RM_FreeString(RedisModuleCtx * ctx,RedisModuleString * str)940 void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
941     decrRefCount(str);
942     if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
943 }
944 
945 /* Every call to this function, will make the string 'str' requiring
946  * an additional call to RedisModule_FreeString() in order to really
947  * free the string. Note that the automatic freeing of the string obtained
948  * enabling modules automatic memory management counts for one
949  * RedisModule_FreeString() call (it is just executed automatically).
950  *
951  * Normally you want to call this function when, at the same time
952  * the following conditions are true:
953  *
954  * 1) You have automatic memory management enabled.
955  * 2) You want to create string objects.
956  * 3) Those string objects you create need to live *after* the callback
957  *    function(for example a command implementation) creating them returns.
958  *
959  * Usually you want this in order to store the created string object
960  * into your own data structure, for example when implementing a new data
961  * type.
962  *
963  * Note that when memory management is turned off, you don't need
964  * any call to RetainString() since creating a string will always result
965  * into a string that lives after the callback function returns, if
966  * no FreeString() call is performed.
967  *
968  * It is possible to call this function with a NULL context. */
RM_RetainString(RedisModuleCtx * ctx,RedisModuleString * str)969 void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
970     if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
971         /* Increment the string reference counting only if we can't
972          * just remove the object from the list of objects that should
973          * be reclaimed. Why we do that, instead of just incrementing
974          * the refcount in any case, and let the automatic FreeString()
975          * call at the end to bring the refcount back at the desired
976          * value? Because this way we ensure that the object refcount
977          * value is 1 (instead of going to 2 to be dropped later to 1)
978          * after the call to this function. This is needed for functions
979          * like RedisModule_StringAppendBuffer() to work. */
980         incrRefCount(str);
981     }
982 }
983 
984 /* Given a string module object, this function returns the string pointer
985  * and length of the string. The returned pointer and length should only
986  * be used for read only accesses and never modified. */
RM_StringPtrLen(const RedisModuleString * str,size_t * len)987 const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len) {
988     if (str == NULL) {
989         const char *errmsg = "(NULL string reply referenced in module)";
990         if (len) *len = strlen(errmsg);
991         return errmsg;
992     }
993     if (len) *len = sdslen(str->ptr);
994     return str->ptr;
995 }
996 
997 /* --------------------------------------------------------------------------
998  * Higher level string operations
999  * ------------------------------------------------------------------------- */
1000 
1001 /* Convert the string into a long long integer, storing it at `*ll`.
1002  * Returns REDISMODULE_OK on success. If the string can't be parsed
1003  * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
1004  * is returned. */
RM_StringToLongLong(const RedisModuleString * str,long long * ll)1005 int RM_StringToLongLong(const RedisModuleString *str, long long *ll) {
1006     return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK :
1007                                                      REDISMODULE_ERR;
1008 }
1009 
1010 /* Convert the string into a double, storing it at `*d`.
1011  * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
1012  * not a valid string representation of a double value. */
RM_StringToDouble(const RedisModuleString * str,double * d)1013 int RM_StringToDouble(const RedisModuleString *str, double *d) {
1014     int retval = getDoubleFromObject(str,d);
1015     return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1016 }
1017 
1018 /* Compare two string objects, returning -1, 0 or 1 respectively if
1019  * a < b, a == b, a > b. Strings are compared byte by byte as two
1020  * binary blobs without any encoding care / collation attempt. */
RM_StringCompare(RedisModuleString * a,RedisModuleString * b)1021 int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
1022     return compareStringObjects(a,b);
1023 }
1024 
1025 /* Return the (possibly modified in encoding) input 'str' object if
1026  * the string is unshared, otherwise NULL is returned. */
moduleAssertUnsharedString(RedisModuleString * str)1027 RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
1028     if (str->refcount != 1) {
1029         serverLog(LL_WARNING,
1030             "Module attempted to use an in-place string modify operation "
1031             "with a string referenced multiple times. Please check the code "
1032             "for API usage correctness.");
1033         return NULL;
1034     }
1035     if (str->encoding == OBJ_ENCODING_EMBSTR) {
1036         /* Note: here we "leak" the additional allocation that was
1037          * used in order to store the embedded string in the object. */
1038         str->ptr = sdsnewlen(str->ptr,sdslen(str->ptr));
1039         str->encoding = OBJ_ENCODING_RAW;
1040     } else if (str->encoding == OBJ_ENCODING_INT) {
1041         /* Convert the string from integer to raw encoding. */
1042         str->ptr = sdsfromlonglong((long)str->ptr);
1043         str->encoding = OBJ_ENCODING_RAW;
1044     }
1045     return str;
1046 }
1047 
1048 /* Append the specified buffer to the string 'str'. The string must be a
1049  * string created by the user that is referenced only a single time, otherwise
1050  * REDISMODULE_ERR is returned and the operation is not performed. */
RM_StringAppendBuffer(RedisModuleCtx * ctx,RedisModuleString * str,const char * buf,size_t len)1051 int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len) {
1052     UNUSED(ctx);
1053     str = moduleAssertUnsharedString(str);
1054     if (str == NULL) return REDISMODULE_ERR;
1055     str->ptr = sdscatlen(str->ptr,buf,len);
1056     return REDISMODULE_OK;
1057 }
1058 
1059 /* --------------------------------------------------------------------------
1060  * Reply APIs
1061  *
1062  * Most functions always return REDISMODULE_OK so you can use it with
1063  * 'return' in order to return from the command implementation with:
1064  *
1065  *     if (... some condition ...)
1066  *         return RM_ReplyWithLongLong(ctx,mycount);
1067  * -------------------------------------------------------------------------- */
1068 
1069 /* Send an error about the number of arguments given to the command,
1070  * citing the command name in the error message.
1071  *
1072  * Example:
1073  *
1074  *     if (argc != 3) return RedisModule_WrongArity(ctx);
1075  */
RM_WrongArity(RedisModuleCtx * ctx)1076 int RM_WrongArity(RedisModuleCtx *ctx) {
1077     addReplyErrorFormat(ctx->client,
1078         "wrong number of arguments for '%s' command",
1079         (char*)ctx->client->argv[0]->ptr);
1080     return REDISMODULE_OK;
1081 }
1082 
1083 /* Return the client object the `RM_Reply*` functions should target.
1084  * Normally this is just `ctx->client`, that is the client that called
1085  * the module command, however in the case of thread safe contexts there
1086  * is no directly associated client (since it would not be safe to access
1087  * the client from a thread), so instead the blocked client object referenced
1088  * in the thread safe context, has a fake client that we just use to accumulate
1089  * the replies. Later, when the client is unblocked, the accumulated replies
1090  * are appended to the actual client.
1091  *
1092  * The function returns the client pointer depending on the context, or
1093  * NULL if there is no potential client. This happens when we are in the
1094  * context of a thread safe context that was not initialized with a blocked
1095  * client object. Other contexts without associated clients are the ones
1096  * initialized to run the timers callbacks. */
moduleGetReplyClient(RedisModuleCtx * ctx)1097 client *moduleGetReplyClient(RedisModuleCtx *ctx) {
1098     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
1099         if (ctx->blocked_client)
1100             return ctx->blocked_client->reply_client;
1101         else
1102             return NULL;
1103     } else {
1104         /* If this is a non thread safe context, just return the client
1105          * that is running the command if any. This may be NULL as well
1106          * in the case of contexts that are not executed with associated
1107          * clients, like timer contexts. */
1108         return ctx->client;
1109     }
1110 }
1111 
1112 /* Send an integer reply to the client, with the specified long long value.
1113  * The function always returns REDISMODULE_OK. */
RM_ReplyWithLongLong(RedisModuleCtx * ctx,long long ll)1114 int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
1115     client *c = moduleGetReplyClient(ctx);
1116     if (c == NULL) return REDISMODULE_OK;
1117     addReplyLongLong(c,ll);
1118     return REDISMODULE_OK;
1119 }
1120 
1121 /* Reply with an error or simple string (status message). Used to implement
1122  * ReplyWithSimpleString() and ReplyWithError().
1123  * The function always returns REDISMODULE_OK. */
replyWithStatus(RedisModuleCtx * ctx,const char * msg,char * prefix)1124 int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
1125     client *c = moduleGetReplyClient(ctx);
1126     if (c == NULL) return REDISMODULE_OK;
1127     sds strmsg = sdsnewlen(prefix,1);
1128     strmsg = sdscat(strmsg,msg);
1129     strmsg = sdscatlen(strmsg,"\r\n",2);
1130     addReplySds(c,strmsg);
1131     return REDISMODULE_OK;
1132 }
1133 
1134 /* Reply with the error 'err'.
1135  *
1136  * Note that 'err' must contain all the error, including
1137  * the initial error code. The function only provides the initial "-", so
1138  * the usage is, for example:
1139  *
1140  *     RedisModule_ReplyWithError(ctx,"ERR Wrong Type");
1141  *
1142  * and not just:
1143  *
1144  *     RedisModule_ReplyWithError(ctx,"Wrong Type");
1145  *
1146  * The function always returns REDISMODULE_OK.
1147  */
RM_ReplyWithError(RedisModuleCtx * ctx,const char * err)1148 int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
1149     return replyWithStatus(ctx,err,"-");
1150 }
1151 
1152 /* Reply with a simple string (+... \r\n in RESP protocol). This replies
1153  * are suitable only when sending a small non-binary string with small
1154  * overhead, like "OK" or similar replies.
1155  *
1156  * The function always returns REDISMODULE_OK. */
RM_ReplyWithSimpleString(RedisModuleCtx * ctx,const char * msg)1157 int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
1158     return replyWithStatus(ctx,msg,"+");
1159 }
1160 
1161 /* Reply with an array type of 'len' elements. However 'len' other calls
1162  * to `ReplyWith*` style functions must follow in order to emit the elements
1163  * of the array.
1164  *
1165  * When producing arrays with a number of element that is not known beforehand
1166  * the function can be called with the special count
1167  * REDISMODULE_POSTPONED_ARRAY_LEN, and the actual number of elements can be
1168  * later set with RedisModule_ReplySetArrayLength() (which will set the
1169  * latest "open" count if there are multiple ones).
1170  *
1171  * The function always returns REDISMODULE_OK. */
RM_ReplyWithArray(RedisModuleCtx * ctx,long len)1172 int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
1173     client *c = moduleGetReplyClient(ctx);
1174     if (c == NULL) return REDISMODULE_OK;
1175     if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
1176         ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)*
1177                 (ctx->postponed_arrays_count+1));
1178         ctx->postponed_arrays[ctx->postponed_arrays_count] =
1179             addDeferredMultiBulkLength(c);
1180         ctx->postponed_arrays_count++;
1181     } else {
1182         addReplyMultiBulkLen(c,len);
1183     }
1184     return REDISMODULE_OK;
1185 }
1186 
1187 /* When RedisModule_ReplyWithArray() is used with the argument
1188  * REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number
1189  * of items we are going to output as elements of the array, this function
1190  * will take care to set the array length.
1191  *
1192  * Since it is possible to have multiple array replies pending with unknown
1193  * length, this function guarantees to always set the latest array length
1194  * that was created in a postponed way.
1195  *
1196  * For example in order to output an array like [1,[10,20,30]] we
1197  * could write:
1198  *
1199  *      RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1200  *      RedisModule_ReplyWithLongLong(ctx,1);
1201  *      RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1202  *      RedisModule_ReplyWithLongLong(ctx,10);
1203  *      RedisModule_ReplyWithLongLong(ctx,20);
1204  *      RedisModule_ReplyWithLongLong(ctx,30);
1205  *      RedisModule_ReplySetArrayLength(ctx,3); // Set len of 10,20,30 array.
1206  *      RedisModule_ReplySetArrayLength(ctx,2); // Set len of top array
1207  *
1208  * Note that in the above example there is no reason to postpone the array
1209  * length, since we produce a fixed number of elements, but in the practice
1210  * the code may use an iterator or other ways of creating the output so
1211  * that is not easy to calculate in advance the number of elements.
1212  */
RM_ReplySetArrayLength(RedisModuleCtx * ctx,long len)1213 void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
1214     client *c = moduleGetReplyClient(ctx);
1215     if (c == NULL) return;
1216     if (ctx->postponed_arrays_count == 0) {
1217         serverLog(LL_WARNING,
1218             "API misuse detected in module %s: "
1219             "RedisModule_ReplySetArrayLength() called without previous "
1220             "RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN) "
1221             "call.", ctx->module->name);
1222             return;
1223     }
1224     ctx->postponed_arrays_count--;
1225     setDeferredMultiBulkLength(c,
1226             ctx->postponed_arrays[ctx->postponed_arrays_count],
1227             len);
1228     if (ctx->postponed_arrays_count == 0) {
1229         zfree(ctx->postponed_arrays);
1230         ctx->postponed_arrays = NULL;
1231     }
1232 }
1233 
1234 /* Reply with a bulk string, taking in input a C buffer pointer and length.
1235  *
1236  * The function always returns REDISMODULE_OK. */
RM_ReplyWithStringBuffer(RedisModuleCtx * ctx,const char * buf,size_t len)1237 int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
1238     client *c = moduleGetReplyClient(ctx);
1239     if (c == NULL) return REDISMODULE_OK;
1240     addReplyBulkCBuffer(c,(char*)buf,len);
1241     return REDISMODULE_OK;
1242 }
1243 
1244 /* Reply with a bulk string, taking in input a RedisModuleString object.
1245  *
1246  * The function always returns REDISMODULE_OK. */
RM_ReplyWithString(RedisModuleCtx * ctx,RedisModuleString * str)1247 int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
1248     client *c = moduleGetReplyClient(ctx);
1249     if (c == NULL) return REDISMODULE_OK;
1250     addReplyBulk(c,str);
1251     return REDISMODULE_OK;
1252 }
1253 
1254 /* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
1255  * as the string "$-1\r\n".
1256  *
1257  * The function always returns REDISMODULE_OK. */
RM_ReplyWithNull(RedisModuleCtx * ctx)1258 int RM_ReplyWithNull(RedisModuleCtx *ctx) {
1259     client *c = moduleGetReplyClient(ctx);
1260     if (c == NULL) return REDISMODULE_OK;
1261     addReply(c,shared.nullbulk);
1262     return REDISMODULE_OK;
1263 }
1264 
1265 /* Reply exactly what a Redis command returned us with RedisModule_Call().
1266  * This function is useful when we use RedisModule_Call() in order to
1267  * execute some command, as we want to reply to the client exactly the
1268  * same reply we obtained by the command.
1269  *
1270  * The function always returns REDISMODULE_OK. */
RM_ReplyWithCallReply(RedisModuleCtx * ctx,RedisModuleCallReply * reply)1271 int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
1272     client *c = moduleGetReplyClient(ctx);
1273     if (c == NULL) return REDISMODULE_OK;
1274     sds proto = sdsnewlen(reply->proto, reply->protolen);
1275     addReplySds(c,proto);
1276     return REDISMODULE_OK;
1277 }
1278 
1279 /* Send a string reply obtained converting the double 'd' into a bulk string.
1280  * This function is basically equivalent to converting a double into
1281  * a string into a C buffer, and then calling the function
1282  * RedisModule_ReplyWithStringBuffer() with the buffer and length.
1283  *
1284  * The function always returns REDISMODULE_OK. */
RM_ReplyWithDouble(RedisModuleCtx * ctx,double d)1285 int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
1286     client *c = moduleGetReplyClient(ctx);
1287     if (c == NULL) return REDISMODULE_OK;
1288     addReplyDouble(c,d);
1289     return REDISMODULE_OK;
1290 }
1291 
1292 /* --------------------------------------------------------------------------
1293  * Commands replication API
1294  * -------------------------------------------------------------------------- */
1295 
1296 /* Helper function to replicate MULTI the first time we replicate something
1297  * in the context of a command execution. EXEC will be handled by the
1298  * RedisModuleCommandDispatcher() function. */
moduleReplicateMultiIfNeeded(RedisModuleCtx * ctx)1299 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
1300     /* Skip this if client explicitly wrap the command with MULTI, or if
1301      * the module command was called by a script. */
1302     if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return;
1303     /* If we already emitted MULTI return ASAP. */
1304     if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
1305     /* If this is a thread safe context, we do not want to wrap commands
1306      * executed into MUTLI/EXEC, they are executed as single commands
1307      * from an external client in essence. */
1308     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
1309     execCommandPropagateMulti(ctx->client);
1310     ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
1311 }
1312 
1313 /* Replicate the specified command and arguments to slaves and AOF, as effect
1314  * of execution of the calling command implementation.
1315  *
1316  * The replicated commands are always wrapped into the MULTI/EXEC that
1317  * contains all the commands replicated in a given module command
1318  * execution. However the commands replicated with RedisModule_Call()
1319  * are the first items, the ones replicated with RedisModule_Replicate()
1320  * will all follow before the EXEC.
1321  *
1322  * Modules should try to use one interface or the other.
1323  *
1324  * This command follows exactly the same interface of RedisModule_Call(),
1325  * so a set of format specifiers must be passed, followed by arguments
1326  * matching the provided format specifiers.
1327  *
1328  * Please refer to RedisModule_Call() for more information.
1329  *
1330  * The command returns REDISMODULE_ERR if the format specifiers are invalid
1331  * or the command name does not belong to a known command. */
RM_Replicate(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)1332 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
1333     struct redisCommand *cmd;
1334     robj **argv = NULL;
1335     int argc = 0, flags = 0, j;
1336     va_list ap;
1337 
1338     cmd = lookupCommandByCString((char*)cmdname);
1339     if (!cmd) return REDISMODULE_ERR;
1340 
1341     /* Create the client and dispatch the command. */
1342     va_start(ap, fmt);
1343     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
1344     va_end(ap);
1345     if (argv == NULL) return REDISMODULE_ERR;
1346 
1347     /* Replicate! */
1348     moduleReplicateMultiIfNeeded(ctx);
1349     alsoPropagate(cmd,ctx->client->db->id,argv,argc,
1350         PROPAGATE_AOF|PROPAGATE_REPL);
1351 
1352     /* Release the argv. */
1353     for (j = 0; j < argc; j++) decrRefCount(argv[j]);
1354     zfree(argv);
1355     server.dirty++;
1356     return REDISMODULE_OK;
1357 }
1358 
1359 /* This function will replicate the command exactly as it was invoked
1360  * by the client. Note that this function will not wrap the command into
1361  * a MULTI/EXEC stanza, so it should not be mixed with other replication
1362  * commands.
1363  *
1364  * Basically this form of replication is useful when you want to propagate
1365  * the command to the slaves and AOF file exactly as it was called, since
1366  * the command can just be re-executed to deterministically re-create the
1367  * new state starting from the old one.
1368  *
1369  * The function always returns REDISMODULE_OK. */
RM_ReplicateVerbatim(RedisModuleCtx * ctx)1370 int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
1371     alsoPropagate(ctx->client->cmd,ctx->client->db->id,
1372         ctx->client->argv,ctx->client->argc,
1373         PROPAGATE_AOF|PROPAGATE_REPL);
1374     server.dirty++;
1375     return REDISMODULE_OK;
1376 }
1377 
1378 /* --------------------------------------------------------------------------
1379  * DB and Key APIs -- Generic API
1380  * -------------------------------------------------------------------------- */
1381 
1382 /* Return the ID of the current client calling the currently active module
1383  * command. The returned ID has a few guarantees:
1384  *
1385  * 1. The ID is different for each different client, so if the same client
1386  *    executes a module command multiple times, it can be recognized as
1387  *    having the same ID, otherwise the ID will be different.
1388  * 2. The ID increases monotonically. Clients connecting to the server later
1389  *    are guaranteed to get IDs greater than any past ID previously seen.
1390  *
1391  * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way
1392  * to fetch the ID in the context the function was currently called. */
RM_GetClientId(RedisModuleCtx * ctx)1393 unsigned long long RM_GetClientId(RedisModuleCtx *ctx) {
1394     if (ctx->client == NULL) return 0;
1395     return ctx->client->id;
1396 }
1397 
1398 /* Return the currently selected DB. */
RM_GetSelectedDb(RedisModuleCtx * ctx)1399 int RM_GetSelectedDb(RedisModuleCtx *ctx) {
1400     return ctx->client->db->id;
1401 }
1402 
1403 
1404 /* Return the current context's flags. The flags provide information on the
1405  * current request context (whether the client is a Lua script or in a MULTI),
1406  * and about the Redis instance in general, i.e replication and persistence.
1407  *
1408  * The available flags are:
1409  *
1410  *  * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script
1411  *
1412  *  * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
1413  *
1414  *  * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication
1415  *    link by the MASTER
1416  *
1417  *  * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
1418  *
1419  *  * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
1420  *
1421  *  * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
1422  *
1423  *  * REDISMODULE_CTX_FLAGS_CLUSTER: The Redis instance is in cluster mode
1424  *
1425  *  * REDISMODULE_CTX_FLAGS_AOF: The Redis instance has AOF enabled
1426  *
1427  *  * REDISMODULE_CTX_FLAGS_RDB: The instance has RDB enabled
1428  *
1429  *  * REDISMODULE_CTX_FLAGS_MAXMEMORY:  The instance has Maxmemory set
1430  *
1431  *  * REDISMODULE_CTX_FLAGS_EVICT:  Maxmemory is set and has an eviction
1432  *    policy that may delete keys
1433  *
1434  *  * REDISMODULE_CTX_FLAGS_OOM: Redis is out of memory according to the
1435  *    maxmemory setting.
1436  *
1437  *  * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
1438  *                                       reaching the maxmemory level.
1439  */
RM_GetContextFlags(RedisModuleCtx * ctx)1440 int RM_GetContextFlags(RedisModuleCtx *ctx) {
1441 
1442     int flags = 0;
1443     /* Client specific flags */
1444     if (ctx->client) {
1445         if (ctx->client->flags & CLIENT_LUA)
1446          flags |= REDISMODULE_CTX_FLAGS_LUA;
1447         if (ctx->client->flags & CLIENT_MULTI)
1448          flags |= REDISMODULE_CTX_FLAGS_MULTI;
1449         /* Module command recieved from MASTER, is replicated. */
1450         if (ctx->client->flags & CLIENT_MASTER)
1451          flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
1452     }
1453 
1454     if (server.cluster_enabled)
1455         flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
1456 
1457     /* Maxmemory and eviction policy */
1458     if (server.maxmemory > 0) {
1459         flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;
1460 
1461         if (server.maxmemory_policy != MAXMEMORY_NO_EVICTION)
1462             flags |= REDISMODULE_CTX_FLAGS_EVICT;
1463     }
1464 
1465     /* Persistence flags */
1466     if (server.aof_state != AOF_OFF)
1467         flags |= REDISMODULE_CTX_FLAGS_AOF;
1468     if (server.saveparamslen > 0)
1469         flags |= REDISMODULE_CTX_FLAGS_RDB;
1470 
1471     /* Replication flags */
1472     if (server.masterhost == NULL) {
1473         flags |= REDISMODULE_CTX_FLAGS_MASTER;
1474     } else {
1475         flags |= REDISMODULE_CTX_FLAGS_SLAVE;
1476         if (server.repl_slave_ro)
1477             flags |= REDISMODULE_CTX_FLAGS_READONLY;
1478     }
1479 
1480     /* OOM flag. */
1481     float level;
1482     int retval = getMaxmemoryState(NULL,NULL,NULL,&level);
1483     if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM;
1484     if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING;
1485 
1486     return flags;
1487 }
1488 
1489 /* Change the currently selected DB. Returns an error if the id
1490  * is out of range.
1491  *
1492  * Note that the client will retain the currently selected DB even after
1493  * the Redis command implemented by the module calling this function
1494  * returns.
1495  *
1496  * If the module command wishes to change something in a different DB and
1497  * returns back to the original one, it should call RedisModule_GetSelectedDb()
1498  * before in order to restore the old DB number before returning. */
RM_SelectDb(RedisModuleCtx * ctx,int newid)1499 int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
1500     int retval = selectDb(ctx->client,newid);
1501     return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1502 }
1503 
1504 /* Return an handle representing a Redis key, so that it is possible
1505  * to call other APIs with the key handle as argument to perform
1506  * operations on the key.
1507  *
1508  * The return value is the handle representing the key, that must be
1509  * closed with RM_CloseKey().
1510  *
1511  * If the key does not exist and WRITE mode is requested, the handle
1512  * is still returned, since it is possible to perform operations on
1513  * a yet not existing key (that will be created, for example, after
1514  * a list push operation). If the mode is just READ instead, and the
1515  * key does not exist, NULL is returned. However it is still safe to
1516  * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL
1517  * value. */
RM_OpenKey(RedisModuleCtx * ctx,robj * keyname,int mode)1518 void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
1519     RedisModuleKey *kp;
1520     robj *value;
1521 
1522     if (mode & REDISMODULE_WRITE) {
1523         value = lookupKeyWrite(ctx->client->db,keyname);
1524     } else {
1525         value = lookupKeyRead(ctx->client->db,keyname);
1526         if (value == NULL) {
1527             return NULL;
1528         }
1529     }
1530 
1531     /* Setup the key handle. */
1532     kp = zmalloc(sizeof(*kp));
1533     kp->ctx = ctx;
1534     kp->db = ctx->client->db;
1535     kp->key = keyname;
1536     incrRefCount(keyname);
1537     kp->value = value;
1538     kp->iter = NULL;
1539     kp->mode = mode;
1540     zsetKeyReset(kp);
1541     autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
1542     return (void*)kp;
1543 }
1544 
1545 /* Close a key handle. */
RM_CloseKey(RedisModuleKey * key)1546 void RM_CloseKey(RedisModuleKey *key) {
1547     if (key == NULL) return;
1548     if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
1549     /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
1550     RM_ZsetRangeStop(key);
1551     decrRefCount(key->key);
1552     autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
1553     zfree(key);
1554 }
1555 
1556 /* Return the type of the key. If the key pointer is NULL then
1557  * REDISMODULE_KEYTYPE_EMPTY is returned. */
RM_KeyType(RedisModuleKey * key)1558 int RM_KeyType(RedisModuleKey *key) {
1559     if (key == NULL || key->value ==  NULL) return REDISMODULE_KEYTYPE_EMPTY;
1560     /* We map between defines so that we are free to change the internal
1561      * defines as desired. */
1562     switch(key->value->type) {
1563     case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING;
1564     case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST;
1565     case OBJ_SET: return REDISMODULE_KEYTYPE_SET;
1566     case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
1567     case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
1568     case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
1569     default: return 0;
1570     }
1571 }
1572 
1573 /* Return the length of the value associated with the key.
1574  * For strings this is the length of the string. For all the other types
1575  * is the number of elements (just counting keys for hashes).
1576  *
1577  * If the key pointer is NULL or the key is empty, zero is returned. */
RM_ValueLength(RedisModuleKey * key)1578 size_t RM_ValueLength(RedisModuleKey *key) {
1579     if (key == NULL || key->value == NULL) return 0;
1580     switch(key->value->type) {
1581     case OBJ_STRING: return stringObjectLen(key->value);
1582     case OBJ_LIST: return listTypeLength(key->value);
1583     case OBJ_SET: return setTypeSize(key->value);
1584     case OBJ_ZSET: return zsetLength(key->value);
1585     case OBJ_HASH: return hashTypeLength(key->value);
1586     default: return 0;
1587     }
1588 }
1589 
1590 /* If the key is open for writing, remove it, and setup the key to
1591  * accept new writes as an empty key (that will be created on demand).
1592  * On success REDISMODULE_OK is returned. If the key is not open for
1593  * writing REDISMODULE_ERR is returned. */
RM_DeleteKey(RedisModuleKey * key)1594 int RM_DeleteKey(RedisModuleKey *key) {
1595     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1596     if (key->value) {
1597         dbDelete(key->db,key->key);
1598         key->value = NULL;
1599     }
1600     return REDISMODULE_OK;
1601 }
1602 
1603 /* If the key is open for writing, unlink it (that is delete it in a
1604  * non-blocking way, not reclaiming memory immediately) and setup the key to
1605  * accept new writes as an empty key (that will be created on demand).
1606  * On success REDISMODULE_OK is returned. If the key is not open for
1607  * writing REDISMODULE_ERR is returned. */
RM_UnlinkKey(RedisModuleKey * key)1608 int RM_UnlinkKey(RedisModuleKey *key) {
1609     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1610     if (key->value) {
1611         dbAsyncDelete(key->db,key->key);
1612         key->value = NULL;
1613     }
1614     return REDISMODULE_OK;
1615 }
1616 
1617 /* Return the key expire value, as milliseconds of remaining TTL.
1618  * If no TTL is associated with the key or if the key is empty,
1619  * REDISMODULE_NO_EXPIRE is returned. */
RM_GetExpire(RedisModuleKey * key)1620 mstime_t RM_GetExpire(RedisModuleKey *key) {
1621     mstime_t expire = getExpire(key->db,key->key);
1622     if (expire == -1 || key->value == NULL) return -1;
1623     expire -= mstime();
1624     return expire >= 0 ? expire : 0;
1625 }
1626 
1627 /* Set a new expire for the key. If the special expire
1628  * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was
1629  * one (the same as the PERSIST command).
1630  *
1631  * Note that the expire must be provided as a positive integer representing
1632  * the number of milliseconds of TTL the key should have.
1633  *
1634  * The function returns REDISMODULE_OK on success or REDISMODULE_ERR if
1635  * the key was not open for writing or is an empty key. */
RM_SetExpire(RedisModuleKey * key,mstime_t expire)1636 int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
1637     if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL)
1638         return REDISMODULE_ERR;
1639     if (expire != REDISMODULE_NO_EXPIRE) {
1640         expire += mstime();
1641         setExpire(key->ctx->client,key->db,key->key,expire);
1642     } else {
1643         removeExpire(key->db,key->key);
1644     }
1645     return REDISMODULE_OK;
1646 }
1647 
1648 /* --------------------------------------------------------------------------
1649  * Key API for String type
1650  * -------------------------------------------------------------------------- */
1651 
1652 /* If the key is open for writing, set the specified string 'str' as the
1653  * value of the key, deleting the old value if any.
1654  * On success REDISMODULE_OK is returned. If the key is not open for
1655  * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_StringSet(RedisModuleKey * key,RedisModuleString * str)1656 int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
1657     if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
1658     RM_DeleteKey(key);
1659     setKey(key->db,key->key,str);
1660     key->value = str;
1661     return REDISMODULE_OK;
1662 }
1663 
1664 /* Prepare the key associated string value for DMA access, and returns
1665  * a pointer and size (by reference), that the user can use to read or
1666  * modify the string in-place accessing it directly via pointer.
1667  *
1668  * The 'mode' is composed by bitwise OR-ing the following flags:
1669  *
1670  *     REDISMODULE_READ -- Read access
1671  *     REDISMODULE_WRITE -- Write access
1672  *
1673  * If the DMA is not requested for writing, the pointer returned should
1674  * only be accessed in a read-only fashion.
1675  *
1676  * On error (wrong type) NULL is returned.
1677  *
1678  * DMA access rules:
1679  *
1680  * 1. No other key writing function should be called since the moment
1681  * the pointer is obtained, for all the time we want to use DMA access
1682  * to read or modify the string.
1683  *
1684  * 2. Each time RM_StringTruncate() is called, to continue with the DMA
1685  * access, RM_StringDMA() should be called again to re-obtain
1686  * a new pointer and length.
1687  *
1688  * 3. If the returned pointer is not NULL, but the length is zero, no
1689  * byte can be touched (the string is empty, or the key itself is empty)
1690  * so a RM_StringTruncate() call should be used if there is to enlarge
1691  * the string, and later call StringDMA() again to get the pointer.
1692  */
RM_StringDMA(RedisModuleKey * key,size_t * len,int mode)1693 char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
1694     /* We need to return *some* pointer for empty keys, we just return
1695      * a string literal pointer, that is the advantage to be mapped into
1696      * a read only memory page, so the module will segfault if a write
1697      * attempt is performed. */
1698     char *emptystring = "<dma-empty-string>";
1699     if (key->value == NULL) {
1700         *len = 0;
1701         return emptystring;
1702     }
1703 
1704     if (key->value->type != OBJ_STRING) return NULL;
1705 
1706     /* For write access, and even for read access if the object is encoded,
1707      * we unshare the string (that has the side effect of decoding it). */
1708     if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
1709         key->value = dbUnshareStringValue(key->db, key->key, key->value);
1710 
1711     *len = sdslen(key->value->ptr);
1712     return key->value->ptr;
1713 }
1714 
1715 /* If the string is open for writing and is of string type, resize it, padding
1716  * with zero bytes if the new length is greater than the old one.
1717  *
1718  * After this call, RM_StringDMA() must be called again to continue
1719  * DMA access with the new pointer.
1720  *
1721  * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on
1722  * error, that is, the key is not open for writing, is not a string
1723  * or resizing for more than 512 MB is requested.
1724  *
1725  * If the key is empty, a string key is created with the new string value
1726  * unless the new length value requested is zero. */
RM_StringTruncate(RedisModuleKey * key,size_t newlen)1727 int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
1728     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1729     if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR;
1730     if (newlen > 512*1024*1024) return REDISMODULE_ERR;
1731 
1732     /* Empty key and new len set to 0. Just return REDISMODULE_OK without
1733      * doing anything. */
1734     if (key->value == NULL && newlen == 0) return REDISMODULE_OK;
1735 
1736     if (key->value == NULL) {
1737         /* Empty key: create it with the new size. */
1738         robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
1739         setKey(key->db,key->key,o);
1740         key->value = o;
1741         decrRefCount(o);
1742     } else {
1743         /* Unshare and resize. */
1744         key->value = dbUnshareStringValue(key->db, key->key, key->value);
1745         size_t curlen = sdslen(key->value->ptr);
1746         if (newlen > curlen) {
1747             key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
1748         } else if (newlen < curlen) {
1749             sdsrange(key->value->ptr,0,newlen-1);
1750             /* If the string is too wasteful, reallocate it. */
1751             if (sdslen(key->value->ptr) < sdsavail(key->value->ptr))
1752                 key->value->ptr = sdsRemoveFreeSpace(key->value->ptr);
1753         }
1754     }
1755     return REDISMODULE_OK;
1756 }
1757 
1758 /* --------------------------------------------------------------------------
1759  * Key API for List type
1760  * -------------------------------------------------------------------------- */
1761 
1762 /* Push an element into a list, on head or tail depending on 'where' argument.
1763  * If the key pointer is about an empty key opened for writing, the key
1764  * is created. On error (key opened for read-only operations or of the wrong
1765  * type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */
RM_ListPush(RedisModuleKey * key,int where,RedisModuleString * ele)1766 int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
1767     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1768     if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR;
1769     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST);
1770     listTypePush(key->value, ele,
1771         (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1772     return REDISMODULE_OK;
1773 }
1774 
1775 /* Pop an element from the list, and returns it as a module string object
1776  * that the user should be free with RM_FreeString() or by enabling
1777  * automatic memory. 'where' specifies if the element should be popped from
1778  * head or tail. The command returns NULL if:
1779  * 1) The list is empty.
1780  * 2) The key was not open for writing.
1781  * 3) The key is not a list. */
RM_ListPop(RedisModuleKey * key,int where)1782 RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
1783     if (!(key->mode & REDISMODULE_WRITE) ||
1784         key->value == NULL ||
1785         key->value->type != OBJ_LIST) return NULL;
1786     robj *ele = listTypePop(key->value,
1787         (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1788     robj *decoded = getDecodedObject(ele);
1789     decrRefCount(ele);
1790     moduleDelKeyIfEmpty(key);
1791     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded);
1792     return decoded;
1793 }
1794 
1795 /* --------------------------------------------------------------------------
1796  * Key API for Sorted Set type
1797  * -------------------------------------------------------------------------- */
1798 
1799 /* Conversion from/to public flags of the Modules API and our private flags,
1800  * so that we have everything decoupled. */
RM_ZsetAddFlagsToCoreFlags(int flags)1801 int RM_ZsetAddFlagsToCoreFlags(int flags) {
1802     int retflags = 0;
1803     if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX;
1804     if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX;
1805     return retflags;
1806 }
1807 
1808 /* See previous function comment. */
RM_ZsetAddFlagsFromCoreFlags(int flags)1809 int RM_ZsetAddFlagsFromCoreFlags(int flags) {
1810     int retflags = 0;
1811     if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED;
1812     if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED;
1813     if (flags & ZADD_NOP) retflags |= REDISMODULE_ZADD_NOP;
1814     return retflags;
1815 }
1816 
1817 /* Add a new element into a sorted set, with the specified 'score'.
1818  * If the element already exists, the score is updated.
1819  *
1820  * A new sorted set is created at value if the key is an empty open key
1821  * setup for writing.
1822  *
1823  * Additional flags can be passed to the function via a pointer, the flags
1824  * are both used to receive input and to communicate state when the function
1825  * returns. 'flagsptr' can be NULL if no special flags are used.
1826  *
1827  * The input flags are:
1828  *
1829  *     REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise.
1830  *     REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise.
1831  *
1832  * The output flags are:
1833  *
1834  *     REDISMODULE_ZADD_ADDED: The new element was added to the sorted set.
1835  *     REDISMODULE_ZADD_UPDATED: The score of the element was updated.
1836  *     REDISMODULE_ZADD_NOP: No operation was performed because XX or NX flags.
1837  *
1838  * On success the function returns REDISMODULE_OK. On the following errors
1839  * REDISMODULE_ERR is returned:
1840  *
1841  * * The key was not opened for writing.
1842  * * The key is of the wrong type.
1843  * * 'score' double value is not a number (NaN).
1844  */
RM_ZsetAdd(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr)1845 int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr) {
1846     int flags = 0;
1847     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1848     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1849     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
1850     if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
1851     if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) {
1852         if (flagsptr) *flagsptr = 0;
1853         return REDISMODULE_ERR;
1854     }
1855     if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
1856     return REDISMODULE_OK;
1857 }
1858 
1859 /* This function works exactly like RM_ZsetAdd(), but instead of setting
1860  * a new score, the score of the existing element is incremented, or if the
1861  * element does not already exist, it is added assuming the old score was
1862  * zero.
1863  *
1864  * The input and output flags, and the return value, have the same exact
1865  * meaning, with the only difference that this function will return
1866  * REDISMODULE_ERR even when 'score' is a valid double number, but adding it
1867  * to the existing score results into a NaN (not a number) condition.
1868  *
1869  * This function has an additional field 'newscore', if not NULL is filled
1870  * with the new score of the element after the increment, if no error
1871  * is returned. */
RM_ZsetIncrby(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr,double * newscore)1872 int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore) {
1873     int flags = 0;
1874     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1875     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1876     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
1877     if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
1878     flags |= ZADD_INCR;
1879     if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) {
1880         if (flagsptr) *flagsptr = 0;
1881         return REDISMODULE_ERR;
1882     }
1883     /* zsetAdd() may signal back that the resulting score is not a number. */
1884     if (flagsptr && (*flagsptr & ZADD_NAN)) {
1885         *flagsptr = 0;
1886         return REDISMODULE_ERR;
1887     }
1888     if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
1889     return REDISMODULE_OK;
1890 }
1891 
1892 /* Remove the specified element from the sorted set.
1893  * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR
1894  * on one of the following conditions:
1895  *
1896  * * The key was not opened for writing.
1897  * * The key is of the wrong type.
1898  *
1899  * The return value does NOT indicate the fact the element was really
1900  * removed (since it existed) or not, just if the function was executed
1901  * with success.
1902  *
1903  * In order to know if the element was removed, the additional argument
1904  * 'deleted' must be passed, that populates the integer by reference
1905  * setting it to 1 or 0 depending on the outcome of the operation.
1906  * The 'deleted' argument can be NULL if the caller is not interested
1907  * to know if the element was really removed.
1908  *
1909  * Empty keys will be handled correctly by doing nothing. */
RM_ZsetRem(RedisModuleKey * key,RedisModuleString * ele,int * deleted)1910 int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) {
1911     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1912     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1913     if (key->value != NULL && zsetDel(key->value,ele->ptr)) {
1914         if (deleted) *deleted = 1;
1915     } else {
1916         if (deleted) *deleted = 0;
1917     }
1918     return REDISMODULE_OK;
1919 }
1920 
1921 /* On success retrieve the double score associated at the sorted set element
1922  * 'ele' and returns REDISMODULE_OK. Otherwise REDISMODULE_ERR is returned
1923  * to signal one of the following conditions:
1924  *
1925  * * There is no such element 'ele' in the sorted set.
1926  * * The key is not a sorted set.
1927  * * The key is an open empty key.
1928  */
RM_ZsetScore(RedisModuleKey * key,RedisModuleString * ele,double * score)1929 int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
1930     if (key->value == NULL) return REDISMODULE_ERR;
1931     if (key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1932     if (zsetScore(key->value,ele->ptr,score) == C_ERR) return REDISMODULE_ERR;
1933     return REDISMODULE_OK;
1934 }
1935 
1936 /* --------------------------------------------------------------------------
1937  * Key API for Sorted Set iterator
1938  * -------------------------------------------------------------------------- */
1939 
zsetKeyReset(RedisModuleKey * key)1940 void zsetKeyReset(RedisModuleKey *key) {
1941     key->ztype = REDISMODULE_ZSET_RANGE_NONE;
1942     key->zcurrent = NULL;
1943     key->zer = 1;
1944 }
1945 
1946 /* Stop a sorted set iteration. */
RM_ZsetRangeStop(RedisModuleKey * key)1947 void RM_ZsetRangeStop(RedisModuleKey *key) {
1948     /* Free resources if needed. */
1949     if (key->ztype == REDISMODULE_ZSET_RANGE_LEX)
1950         zslFreeLexRange(&key->zlrs);
1951     /* Setup sensible values so that misused iteration API calls when an
1952      * iterator is not active will result into something more sensible
1953      * than crashing. */
1954     zsetKeyReset(key);
1955 }
1956 
1957 /* Return the "End of range" flag value to signal the end of the iteration. */
RM_ZsetRangeEndReached(RedisModuleKey * key)1958 int RM_ZsetRangeEndReached(RedisModuleKey *key) {
1959     return key->zer;
1960 }
1961 
1962 /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange().
1963  * Setup the sorted set iteration according to the specified score range
1964  * (see the functions calling it for more info). If 'first' is true the
1965  * first element in the range is used as a starting point for the iterator
1966  * otherwise the last. Return REDISMODULE_OK on success otherwise
1967  * REDISMODULE_ERR. */
zsetInitScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex,int first)1968 int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex, int first) {
1969     if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1970 
1971     RM_ZsetRangeStop(key);
1972     key->ztype = REDISMODULE_ZSET_RANGE_SCORE;
1973     key->zer = 0;
1974 
1975     /* Setup the range structure used by the sorted set core implementation
1976      * in order to seek at the specified element. */
1977     zrangespec *zrs = &key->zrs;
1978     zrs->min = min;
1979     zrs->max = max;
1980     zrs->minex = minex;
1981     zrs->maxex = maxex;
1982 
1983     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
1984         key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) :
1985                                 zzlLastInRange(key->value->ptr,zrs);
1986     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
1987         zset *zs = key->value->ptr;
1988         zskiplist *zsl = zs->zsl;
1989         key->zcurrent = first ? zslFirstInRange(zsl,zrs) :
1990                                 zslLastInRange(zsl,zrs);
1991     } else {
1992         serverPanic("Unsupported zset encoding");
1993     }
1994     if (key->zcurrent == NULL) key->zer = 1;
1995     return REDISMODULE_OK;
1996 }
1997 
1998 /* Setup a sorted set iterator seeking the first element in the specified
1999  * range. Returns REDISMODULE_OK if the iterator was correctly initialized
2000  * otherwise REDISMODULE_ERR is returned in the following conditions:
2001  *
2002  * 1. The value stored at key is not a sorted set or the key is empty.
2003  *
2004  * The range is specified according to the two double values 'min' and 'max'.
2005  * Both can be infinite using the following two macros:
2006  *
2007  * REDISMODULE_POSITIVE_INFINITE for positive infinite value
2008  * REDISMODULE_NEGATIVE_INFINITE for negative infinite value
2009  *
2010  * 'minex' and 'maxex' parameters, if true, respectively setup a range
2011  * where the min and max value are exclusive (not included) instead of
2012  * inclusive. */
RM_ZsetFirstInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2013 int RM_ZsetFirstInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2014     return zsetInitScoreRange(key,min,max,minex,maxex,1);
2015 }
2016 
2017 /* Exactly like RedisModule_ZsetFirstInScoreRange() but the last element of
2018  * the range is selected for the start of the iteration instead. */
RM_ZsetLastInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2019 int RM_ZsetLastInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2020     return zsetInitScoreRange(key,min,max,minex,maxex,0);
2021 }
2022 
2023 /* Helper function for RM_ZsetFirstInLexRange() and RM_ZsetLastInLexRange().
2024  * Setup the sorted set iteration according to the specified lexicographical
2025  * range (see the functions calling it for more info). If 'first' is true the
2026  * first element in the range is used as a starting point for the iterator
2027  * otherwise the last. Return REDISMODULE_OK on success otherwise
2028  * REDISMODULE_ERR.
2029  *
2030  * Note that this function takes 'min' and 'max' in the same form of the
2031  * Redis ZRANGEBYLEX command. */
zsetInitLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max,int first)2032 int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max, int first) {
2033     if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2034 
2035     RM_ZsetRangeStop(key);
2036     key->zer = 0;
2037 
2038     /* Setup the range structure used by the sorted set core implementation
2039      * in order to seek at the specified element. */
2040     zlexrangespec *zlrs = &key->zlrs;
2041     if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR;
2042 
2043     /* Set the range type to lex only after successfully parsing the range,
2044      * otherwise we don't want the zlexrangespec to be freed. */
2045     key->ztype = REDISMODULE_ZSET_RANGE_LEX;
2046 
2047     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2048         key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
2049                                 zzlLastInLexRange(key->value->ptr,zlrs);
2050     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2051         zset *zs = key->value->ptr;
2052         zskiplist *zsl = zs->zsl;
2053         key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) :
2054                                 zslLastInLexRange(zsl,zlrs);
2055     } else {
2056         serverPanic("Unsupported zset encoding");
2057     }
2058     if (key->zcurrent == NULL) key->zer = 1;
2059 
2060     return REDISMODULE_OK;
2061 }
2062 
2063 /* Setup a sorted set iterator seeking the first element in the specified
2064  * lexicographical range. Returns REDISMODULE_OK if the iterator was correctly
2065  * initialized otherwise REDISMODULE_ERR is returned in the
2066  * following conditions:
2067  *
2068  * 1. The value stored at key is not a sorted set or the key is empty.
2069  * 2. The lexicographical range 'min' and 'max' format is invalid.
2070  *
2071  * 'min' and 'max' should be provided as two RedisModuleString objects
2072  * in the same format as the parameters passed to the ZRANGEBYLEX command.
2073  * The function does not take ownership of the objects, so they can be released
2074  * ASAP after the iterator is setup. */
RM_ZsetFirstInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2075 int RM_ZsetFirstInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2076     return zsetInitLexRange(key,min,max,1);
2077 }
2078 
2079 /* Exactly like RedisModule_ZsetFirstInLexRange() but the last element of
2080  * the range is selected for the start of the iteration instead. */
RM_ZsetLastInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2081 int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2082     return zsetInitLexRange(key,min,max,0);
2083 }
2084 
2085 /* Return the current sorted set element of an active sorted set iterator
2086  * or NULL if the range specified in the iterator does not include any
2087  * element. */
RM_ZsetRangeCurrentElement(RedisModuleKey * key,double * score)2088 RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
2089     RedisModuleString *str;
2090 
2091     if (key->zcurrent == NULL) return NULL;
2092     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2093         unsigned char *eptr, *sptr;
2094         eptr = key->zcurrent;
2095         sds ele = ziplistGetObject(eptr);
2096         if (score) {
2097             sptr = ziplistNext(key->value->ptr,eptr);
2098             *score = zzlGetScore(sptr);
2099         }
2100         str = createObject(OBJ_STRING,ele);
2101     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2102         zskiplistNode *ln = key->zcurrent;
2103         if (score) *score = ln->score;
2104         str = createStringObject(ln->ele,sdslen(ln->ele));
2105     } else {
2106         serverPanic("Unsupported zset encoding");
2107     }
2108     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,str);
2109     return str;
2110 }
2111 
2112 /* Go to the next element of the sorted set iterator. Returns 1 if there was
2113  * a next element, 0 if we are already at the latest element or the range
2114  * does not include any item at all. */
RM_ZsetRangeNext(RedisModuleKey * key)2115 int RM_ZsetRangeNext(RedisModuleKey *key) {
2116     if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2117 
2118     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2119         unsigned char *zl = key->value->ptr;
2120         unsigned char *eptr = key->zcurrent;
2121         unsigned char *next;
2122         next = ziplistNext(zl,eptr); /* Skip element. */
2123         if (next) next = ziplistNext(zl,next); /* Skip score. */
2124         if (next == NULL) {
2125             key->zer = 1;
2126             return 0;
2127         } else {
2128             /* Are we still within the range? */
2129             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2130                 /* Fetch the next element score for the
2131                  * range check. */
2132                 unsigned char *saved_next = next;
2133                 next = ziplistNext(zl,next); /* Skip next element. */
2134                 double score = zzlGetScore(next); /* Obtain the next score. */
2135                 if (!zslValueLteMax(score,&key->zrs)) {
2136                     key->zer = 1;
2137                     return 0;
2138                 }
2139                 next = saved_next;
2140             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2141                 if (!zzlLexValueLteMax(next,&key->zlrs)) {
2142                     key->zer = 1;
2143                     return 0;
2144                 }
2145             }
2146             key->zcurrent = next;
2147             return 1;
2148         }
2149     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2150         zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
2151         if (next == NULL) {
2152             key->zer = 1;
2153             return 0;
2154         } else {
2155             /* Are we still within the range? */
2156             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2157                 !zslValueLteMax(next->score,&key->zrs))
2158             {
2159                 key->zer = 1;
2160                 return 0;
2161             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2162                 if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
2163                     key->zer = 1;
2164                     return 0;
2165                 }
2166             }
2167             key->zcurrent = next;
2168             return 1;
2169         }
2170     } else {
2171         serverPanic("Unsupported zset encoding");
2172     }
2173 }
2174 
2175 /* Go to the previous element of the sorted set iterator. Returns 1 if there was
2176  * a previous element, 0 if we are already at the first element or the range
2177  * does not include any item at all. */
RM_ZsetRangePrev(RedisModuleKey * key)2178 int RM_ZsetRangePrev(RedisModuleKey *key) {
2179     if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2180 
2181     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2182         unsigned char *zl = key->value->ptr;
2183         unsigned char *eptr = key->zcurrent;
2184         unsigned char *prev;
2185         prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
2186         if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
2187         if (prev == NULL) {
2188             key->zer = 1;
2189             return 0;
2190         } else {
2191             /* Are we still within the range? */
2192             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2193                 /* Fetch the previous element score for the
2194                  * range check. */
2195                 unsigned char *saved_prev = prev;
2196                 prev = ziplistNext(zl,prev); /* Skip element to get the score.*/
2197                 double score = zzlGetScore(prev); /* Obtain the prev score. */
2198                 if (!zslValueGteMin(score,&key->zrs)) {
2199                     key->zer = 1;
2200                     return 0;
2201                 }
2202                 prev = saved_prev;
2203             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2204                 if (!zzlLexValueGteMin(prev,&key->zlrs)) {
2205                     key->zer = 1;
2206                     return 0;
2207                 }
2208             }
2209             key->zcurrent = prev;
2210             return 1;
2211         }
2212     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2213         zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
2214         if (prev == NULL) {
2215             key->zer = 1;
2216             return 0;
2217         } else {
2218             /* Are we still within the range? */
2219             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2220                 !zslValueGteMin(prev->score,&key->zrs))
2221             {
2222                 key->zer = 1;
2223                 return 0;
2224             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2225                 if (!zslLexValueGteMin(prev->ele,&key->zlrs)) {
2226                     key->zer = 1;
2227                     return 0;
2228                 }
2229             }
2230             key->zcurrent = prev;
2231             return 1;
2232         }
2233     } else {
2234         serverPanic("Unsupported zset encoding");
2235     }
2236 }
2237 
2238 /* --------------------------------------------------------------------------
2239  * Key API for Hash type
2240  * -------------------------------------------------------------------------- */
2241 
2242 /* Set the field of the specified hash field to the specified value.
2243  * If the key is an empty key open for writing, it is created with an empty
2244  * hash value, in order to set the specified field.
2245  *
2246  * The function is variadic and the user must specify pairs of field
2247  * names and values, both as RedisModuleString pointers (unless the
2248  * CFIELD option is set, see later). At the end of the field/value-ptr pairs,
2249  * NULL must be specified as last argument to signal the end of the arguments
2250  * in the variadic function.
2251  *
2252  * Example to set the hash argv[1] to the value argv[2]:
2253  *
2254  *      RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],argv[2],NULL);
2255  *
2256  * The function can also be used in order to delete fields (if they exist)
2257  * by setting them to the specified value of REDISMODULE_HASH_DELETE:
2258  *
2259  *      RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],
2260  *                          REDISMODULE_HASH_DELETE,NULL);
2261  *
2262  * The behavior of the command changes with the specified flags, that can be
2263  * set to REDISMODULE_HASH_NONE if no special behavior is needed.
2264  *
2265  *     REDISMODULE_HASH_NX: The operation is performed only if the field was not
2266  *                          already existing in the hash.
2267  *     REDISMODULE_HASH_XX: The operation is performed only if the field was
2268  *                          already existing, so that a new value could be
2269  *                          associated to an existing filed, but no new fields
2270  *                          are created.
2271  *     REDISMODULE_HASH_CFIELDS: The field names passed are null terminated C
2272  *                               strings instead of RedisModuleString objects.
2273  *
2274  * Unless NX is specified, the command overwrites the old field value with
2275  * the new one.
2276  *
2277  * When using REDISMODULE_HASH_CFIELDS, field names are reported using
2278  * normal C strings, so for example to delete the field "foo" the following
2279  * code can be used:
2280  *
2281  *      RedisModule_HashSet(key,REDISMODULE_HASH_CFIELDS,"foo",
2282  *                          REDISMODULE_HASH_DELETE,NULL);
2283  *
2284  * Return value:
2285  *
2286  * The number of fields updated (that may be less than the number of fields
2287  * specified because of the XX or NX options).
2288  *
2289  * In the following case the return value is always zero:
2290  *
2291  * * The key was not open for writing.
2292  * * The key was associated with a non Hash value.
2293  */
RM_HashSet(RedisModuleKey * key,int flags,...)2294 int RM_HashSet(RedisModuleKey *key, int flags, ...) {
2295     va_list ap;
2296     if (!(key->mode & REDISMODULE_WRITE)) return 0;
2297     if (key->value && key->value->type != OBJ_HASH) return 0;
2298     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_HASH);
2299 
2300     int updated = 0;
2301     va_start(ap, flags);
2302     while(1) {
2303         RedisModuleString *field, *value;
2304         /* Get the field and value objects. */
2305         if (flags & REDISMODULE_HASH_CFIELDS) {
2306             char *cfield = va_arg(ap,char*);
2307             if (cfield == NULL) break;
2308             field = createRawStringObject(cfield,strlen(cfield));
2309         } else {
2310             field = va_arg(ap,RedisModuleString*);
2311             if (field == NULL) break;
2312         }
2313         value = va_arg(ap,RedisModuleString*);
2314 
2315         /* Handle XX and NX */
2316         if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
2317             int exists = hashTypeExists(key->value, field->ptr);
2318             if (((flags & REDISMODULE_HASH_XX) && !exists) ||
2319                 ((flags & REDISMODULE_HASH_NX) && exists))
2320             {
2321                 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2322                 continue;
2323             }
2324         }
2325 
2326         /* Handle deletion if value is REDISMODULE_HASH_DELETE. */
2327         if (value == REDISMODULE_HASH_DELETE) {
2328             updated += hashTypeDelete(key->value, field->ptr);
2329             if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2330             continue;
2331         }
2332 
2333         int low_flags = HASH_SET_COPY;
2334         /* If CFIELDS is active, we can pass the ownership of the
2335          * SDS object to the low level function that sets the field
2336          * to avoid a useless copy. */
2337         if (flags & REDISMODULE_HASH_CFIELDS)
2338             low_flags |= HASH_SET_TAKE_FIELD;
2339 
2340         robj *argv[2] = {field,value};
2341         hashTypeTryConversion(key->value,argv,0,1);
2342         updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
2343 
2344         /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
2345          * however we still have to release the 'field' object shell. */
2346         if (flags & REDISMODULE_HASH_CFIELDS) {
2347            field->ptr = NULL; /* Prevent the SDS string from being freed. */
2348            decrRefCount(field);
2349         }
2350     }
2351     va_end(ap);
2352     moduleDelKeyIfEmpty(key);
2353     return updated;
2354 }
2355 
2356 /* Get fields from an hash value. This function is called using a variable
2357  * number of arguments, alternating a field name (as a StringRedisModule
2358  * pointer) with a pointer to a StringRedisModule pointer, that is set to the
2359  * value of the field if the field exist, or NULL if the field did not exist.
2360  * At the end of the field/value-ptr pairs, NULL must be specified as last
2361  * argument to signal the end of the arguments in the variadic function.
2362  *
2363  * This is an example usage:
2364  *
2365  *      RedisModuleString *first, *second;
2366  *      RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first,
2367  *                      argv[2],&second,NULL);
2368  *
2369  * As with RedisModule_HashSet() the behavior of the command can be specified
2370  * passing flags different than REDISMODULE_HASH_NONE:
2371  *
2372  * REDISMODULE_HASH_CFIELD: field names as null terminated C strings.
2373  *
2374  * REDISMODULE_HASH_EXISTS: instead of setting the value of the field
2375  * expecting a RedisModuleString pointer to pointer, the function just
2376  * reports if the field esists or not and expects an integer pointer
2377  * as the second element of each pair.
2378  *
2379  * Example of REDISMODULE_HASH_CFIELD:
2380  *
2381  *      RedisModuleString *username, *hashedpass;
2382  *      RedisModule_HashGet(mykey,"username",&username,"hp",&hashedpass, NULL);
2383  *
2384  * Example of REDISMODULE_HASH_EXISTS:
2385  *
2386  *      int exists;
2387  *      RedisModule_HashGet(mykey,argv[1],&exists,NULL);
2388  *
2389  * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if
2390  * the key is not an hash value.
2391  *
2392  * Memory management:
2393  *
2394  * The returned RedisModuleString objects should be released with
2395  * RedisModule_FreeString(), or by enabling automatic memory management.
2396  */
RM_HashGet(RedisModuleKey * key,int flags,...)2397 int RM_HashGet(RedisModuleKey *key, int flags, ...) {
2398     va_list ap;
2399     if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR;
2400 
2401     va_start(ap, flags);
2402     while(1) {
2403         RedisModuleString *field, **valueptr;
2404         int *existsptr;
2405         /* Get the field object and the value pointer to pointer. */
2406         if (flags & REDISMODULE_HASH_CFIELDS) {
2407             char *cfield = va_arg(ap,char*);
2408             if (cfield == NULL) break;
2409             field = createRawStringObject(cfield,strlen(cfield));
2410         } else {
2411             field = va_arg(ap,RedisModuleString*);
2412             if (field == NULL) break;
2413         }
2414 
2415         /* Query the hash for existence or value object. */
2416         if (flags & REDISMODULE_HASH_EXISTS) {
2417             existsptr = va_arg(ap,int*);
2418             if (key->value)
2419                 *existsptr = hashTypeExists(key->value,field->ptr);
2420             else
2421                 *existsptr = 0;
2422         } else {
2423             valueptr = va_arg(ap,RedisModuleString**);
2424             if (key->value) {
2425                 *valueptr = hashTypeGetValueObject(key->value,field->ptr);
2426                 if (*valueptr) {
2427                     robj *decoded = getDecodedObject(*valueptr);
2428                     decrRefCount(*valueptr);
2429                     *valueptr = decoded;
2430                 }
2431                 if (*valueptr)
2432                     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,*valueptr);
2433             } else {
2434                 *valueptr = NULL;
2435             }
2436         }
2437 
2438         /* Cleanup */
2439         if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2440     }
2441     va_end(ap);
2442     return REDISMODULE_OK;
2443 }
2444 
2445 /* --------------------------------------------------------------------------
2446  * Redis <-> Modules generic Call() API
2447  * -------------------------------------------------------------------------- */
2448 
2449 /* Create a new RedisModuleCallReply object. The processing of the reply
2450  * is lazy, the object is just populated with the raw protocol and later
2451  * is processed as needed. Initially we just make sure to set the right
2452  * reply type, which is extremely cheap to do. */
moduleCreateCallReplyFromProto(RedisModuleCtx * ctx,sds proto)2453 RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) {
2454     RedisModuleCallReply *reply = zmalloc(sizeof(*reply));
2455     reply->ctx = ctx;
2456     reply->proto = proto;
2457     reply->protolen = sdslen(proto);
2458     reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */
2459     switch(proto[0]) {
2460     case '$':
2461     case '+': reply->type = REDISMODULE_REPLY_STRING; break;
2462     case '-': reply->type = REDISMODULE_REPLY_ERROR; break;
2463     case ':': reply->type = REDISMODULE_REPLY_INTEGER; break;
2464     case '*': reply->type = REDISMODULE_REPLY_ARRAY; break;
2465     default: reply->type = REDISMODULE_REPLY_UNKNOWN; break;
2466     }
2467     if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-')
2468         reply->type = REDISMODULE_REPLY_NULL;
2469     return reply;
2470 }
2471 
2472 void moduleParseCallReply_Int(RedisModuleCallReply *reply);
2473 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply);
2474 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply);
2475 void moduleParseCallReply_Array(RedisModuleCallReply *reply);
2476 
2477 /* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise
2478  * use the protcol of the reply in reply->proto in order to fill the
2479  * reply with parsed data according to the reply type. */
moduleParseCallReply(RedisModuleCallReply * reply)2480 void moduleParseCallReply(RedisModuleCallReply *reply) {
2481     if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return;
2482     reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE;
2483 
2484     switch(reply->proto[0]) {
2485     case ':': moduleParseCallReply_Int(reply); break;
2486     case '$': moduleParseCallReply_BulkString(reply); break;
2487     case '-': /* handled by next item. */
2488     case '+': moduleParseCallReply_SimpleString(reply); break;
2489     case '*': moduleParseCallReply_Array(reply); break;
2490     }
2491 }
2492 
moduleParseCallReply_Int(RedisModuleCallReply * reply)2493 void moduleParseCallReply_Int(RedisModuleCallReply *reply) {
2494     char *proto = reply->proto;
2495     char *p = strchr(proto+1,'\r');
2496 
2497     string2ll(proto+1,p-proto-1,&reply->val.ll);
2498     reply->protolen = p-proto+2;
2499     reply->type = REDISMODULE_REPLY_INTEGER;
2500 }
2501 
moduleParseCallReply_BulkString(RedisModuleCallReply * reply)2502 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) {
2503     char *proto = reply->proto;
2504     char *p = strchr(proto+1,'\r');
2505     long long bulklen;
2506 
2507     string2ll(proto+1,p-proto-1,&bulklen);
2508     if (bulklen == -1) {
2509         reply->protolen = p-proto+2;
2510         reply->type = REDISMODULE_REPLY_NULL;
2511     } else {
2512         reply->val.str = p+2;
2513         reply->len = bulklen;
2514         reply->protolen = p-proto+2+bulklen+2;
2515         reply->type = REDISMODULE_REPLY_STRING;
2516     }
2517 }
2518 
moduleParseCallReply_SimpleString(RedisModuleCallReply * reply)2519 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) {
2520     char *proto = reply->proto;
2521     char *p = strchr(proto+1,'\r');
2522 
2523     reply->val.str = proto+1;
2524     reply->len = p-proto-1;
2525     reply->protolen = p-proto+2;
2526     reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING :
2527                                     REDISMODULE_REPLY_ERROR;
2528 }
2529 
moduleParseCallReply_Array(RedisModuleCallReply * reply)2530 void moduleParseCallReply_Array(RedisModuleCallReply *reply) {
2531     char *proto = reply->proto;
2532     char *p = strchr(proto+1,'\r');
2533     long long arraylen, j;
2534 
2535     string2ll(proto+1,p-proto-1,&arraylen);
2536     p += 2;
2537 
2538     if (arraylen == -1) {
2539         reply->protolen = p-proto;
2540         reply->type = REDISMODULE_REPLY_NULL;
2541         return;
2542     }
2543 
2544     reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen);
2545     reply->len = arraylen;
2546     for (j = 0; j < arraylen; j++) {
2547         RedisModuleCallReply *ele = reply->val.array+j;
2548         ele->flags = REDISMODULE_REPLYFLAG_NESTED |
2549                      REDISMODULE_REPLYFLAG_TOPARSE;
2550         ele->proto = p;
2551         ele->ctx = reply->ctx;
2552         moduleParseCallReply(ele);
2553         p += ele->protolen;
2554     }
2555     reply->protolen = p-proto;
2556     reply->type = REDISMODULE_REPLY_ARRAY;
2557 }
2558 
2559 /* Free a Call reply and all the nested replies it contains if it's an
2560  * array. */
RM_FreeCallReply_Rec(RedisModuleCallReply * reply,int freenested)2561 void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
2562     /* Don't free nested replies by default: the user must always free the
2563      * toplevel reply. However be gentle and don't crash if the module
2564      * misuses the API. */
2565     if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return;
2566 
2567     if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) {
2568         if (reply->type == REDISMODULE_REPLY_ARRAY) {
2569             size_t j;
2570             for (j = 0; j < reply->len; j++)
2571                 RM_FreeCallReply_Rec(reply->val.array+j,1);
2572             zfree(reply->val.array);
2573         }
2574     }
2575 
2576     /* For nested replies, we don't free reply->proto (which if not NULL
2577      * references the parent reply->proto buffer), nor the structure
2578      * itself which is allocated as an array of structures, and is freed
2579      * when the array value is released. */
2580     if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) {
2581         if (reply->proto) sdsfree(reply->proto);
2582         zfree(reply);
2583     }
2584 }
2585 
2586 /* Wrapper for the recursive free reply function. This is needed in order
2587  * to have the first level function to return on nested replies, but only
2588  * if called by the module API. */
RM_FreeCallReply(RedisModuleCallReply * reply)2589 void RM_FreeCallReply(RedisModuleCallReply *reply) {
2590 
2591     RedisModuleCtx *ctx = reply->ctx;
2592     RM_FreeCallReply_Rec(reply,0);
2593     autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
2594 }
2595 
2596 /* Return the reply type. */
RM_CallReplyType(RedisModuleCallReply * reply)2597 int RM_CallReplyType(RedisModuleCallReply *reply) {
2598     if (!reply) return REDISMODULE_REPLY_UNKNOWN;
2599     return reply->type;
2600 }
2601 
2602 /* Return the reply type length, where applicable. */
RM_CallReplyLength(RedisModuleCallReply * reply)2603 size_t RM_CallReplyLength(RedisModuleCallReply *reply) {
2604     moduleParseCallReply(reply);
2605     switch(reply->type) {
2606     case REDISMODULE_REPLY_STRING:
2607     case REDISMODULE_REPLY_ERROR:
2608     case REDISMODULE_REPLY_ARRAY:
2609         return reply->len;
2610     default:
2611         return 0;
2612     }
2613 }
2614 
2615 /* Return the 'idx'-th nested call reply element of an array reply, or NULL
2616  * if the reply type is wrong or the index is out of range. */
RM_CallReplyArrayElement(RedisModuleCallReply * reply,size_t idx)2617 RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) {
2618     moduleParseCallReply(reply);
2619     if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL;
2620     if (idx >= reply->len) return NULL;
2621     return reply->val.array+idx;
2622 }
2623 
2624 /* Return the long long of an integer reply. */
RM_CallReplyInteger(RedisModuleCallReply * reply)2625 long long RM_CallReplyInteger(RedisModuleCallReply *reply) {
2626     moduleParseCallReply(reply);
2627     if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN;
2628     return reply->val.ll;
2629 }
2630 
2631 /* Return the pointer and length of a string or error reply. */
RM_CallReplyStringPtr(RedisModuleCallReply * reply,size_t * len)2632 const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
2633     moduleParseCallReply(reply);
2634     if (reply->type != REDISMODULE_REPLY_STRING &&
2635         reply->type != REDISMODULE_REPLY_ERROR) return NULL;
2636     if (len) *len = reply->len;
2637     return reply->val.str;
2638 }
2639 
2640 /* Return a new string object from a call reply of type string, error or
2641  * integer. Otherwise (wrong reply type) return NULL. */
RM_CreateStringFromCallReply(RedisModuleCallReply * reply)2642 RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
2643     moduleParseCallReply(reply);
2644     switch(reply->type) {
2645     case REDISMODULE_REPLY_STRING:
2646     case REDISMODULE_REPLY_ERROR:
2647         return RM_CreateString(reply->ctx,reply->val.str,reply->len);
2648     case REDISMODULE_REPLY_INTEGER: {
2649         char buf[64];
2650         int len = ll2string(buf,sizeof(buf),reply->val.ll);
2651         return RM_CreateString(reply->ctx,buf,len);
2652         }
2653     default: return NULL;
2654     }
2655 }
2656 
2657 /* Returns an array of robj pointers, and populates *argc with the number
2658  * of items, by parsing the format specifier "fmt" as described for
2659  * the RM_Call(), RM_Replicate() and other module APIs.
2660  *
2661  * The integer pointed by 'flags' is populated with flags according
2662  * to special modifiers in "fmt". For now only one exists:
2663  *
2664  *     "!" -> REDISMODULE_ARGV_REPLICATE
2665  *
2666  * On error (format specifier error) NULL is returned and nothing is
2667  * allocated. On success the argument vector is returned. */
2668 
2669 #define REDISMODULE_ARGV_REPLICATE (1<<0)
2670 
moduleCreateArgvFromUserFormat(const char * cmdname,const char * fmt,int * argcp,int * flags,va_list ap)2671 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
2672     int argc = 0, argv_size, j;
2673     robj **argv = NULL;
2674 
2675     /* As a first guess to avoid useless reallocations, size argv to
2676      * hold one argument for each char specifier in 'fmt'. */
2677     argv_size = strlen(fmt)+1; /* +1 because of the command name. */
2678     argv = zrealloc(argv,sizeof(robj*)*argv_size);
2679 
2680     /* Build the arguments vector based on the format specifier. */
2681     argv[0] = createStringObject(cmdname,strlen(cmdname));
2682     argc++;
2683 
2684     /* Create the client and dispatch the command. */
2685     const char *p = fmt;
2686     while(*p) {
2687         if (*p == 'c') {
2688             char *cstr = va_arg(ap,char*);
2689             argv[argc++] = createStringObject(cstr,strlen(cstr));
2690         } else if (*p == 's') {
2691             robj *obj = va_arg(ap,void*);
2692             argv[argc++] = obj;
2693             incrRefCount(obj);
2694         } else if (*p == 'b') {
2695             char *buf = va_arg(ap,char*);
2696             size_t len = va_arg(ap,size_t);
2697             argv[argc++] = createStringObject(buf,len);
2698         } else if (*p == 'l') {
2699             long ll = va_arg(ap,long long);
2700             argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll));
2701         } else if (*p == 'v') {
2702              /* A vector of strings */
2703              robj **v = va_arg(ap, void*);
2704              size_t vlen = va_arg(ap, size_t);
2705 
2706              /* We need to grow argv to hold the vector's elements.
2707               * We resize by vector_len-1 elements, because we held
2708               * one element in argv for the vector already */
2709              argv_size += vlen-1;
2710              argv = zrealloc(argv,sizeof(robj*)*argv_size);
2711 
2712              size_t i = 0;
2713              for (i = 0; i < vlen; i++) {
2714                  incrRefCount(v[i]);
2715                  argv[argc++] = v[i];
2716              }
2717         } else if (*p == '!') {
2718             if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
2719         } else {
2720             goto fmterr;
2721         }
2722         p++;
2723     }
2724     *argcp = argc;
2725     return argv;
2726 
2727 fmterr:
2728     for (j = 0; j < argc; j++)
2729         decrRefCount(argv[j]);
2730     zfree(argv);
2731     return NULL;
2732 }
2733 
2734 /* Exported API to call any Redis command from modules.
2735  * On success a RedisModuleCallReply object is returned, otherwise
2736  * NULL is returned and errno is set to the following values:
2737  *
2738  * EINVAL: command non existing, wrong arity, wrong format specifier.
2739  * EPERM:  operation in Cluster instance with key in non local slot. */
RM_Call(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)2740 RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
2741     struct redisCommand *cmd;
2742     client *c = NULL;
2743     robj **argv = NULL;
2744     int argc = 0, flags = 0;
2745     va_list ap;
2746     RedisModuleCallReply *reply = NULL;
2747     int replicate = 0; /* Replicate this command? */
2748 
2749     /* Create the client and dispatch the command. */
2750     va_start(ap, fmt);
2751     c = createClient(-1);
2752     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
2753     replicate = flags & REDISMODULE_ARGV_REPLICATE;
2754     va_end(ap);
2755 
2756     /* Setup our fake client for command execution. */
2757     c->flags |= CLIENT_MODULE;
2758     c->db = ctx->client->db;
2759     c->argv = argv;
2760     c->argc = argc;
2761     if (ctx->module) ctx->module->in_call++;
2762 
2763     /* We handle the above format error only when the client is setup so that
2764      * we can free it normally. */
2765     if (argv == NULL) goto cleanup;
2766 
2767     /* Call command filters */
2768     moduleCallCommandFilters(c);
2769 
2770     /* Lookup command now, after filters had a chance to make modifications
2771      * if necessary.
2772      */
2773     cmd = lookupCommand(c->argv[0]->ptr);
2774     if (!cmd) {
2775         errno = EINVAL;
2776         goto cleanup;
2777     }
2778     c->cmd = c->lastcmd = cmd;
2779 
2780     /* Basic arity checks. */
2781     if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
2782         errno = EINVAL;
2783         goto cleanup;
2784     }
2785 
2786     /* If this is a Redis Cluster node, we need to make sure the module is not
2787      * trying to access non-local keys, with the exception of commands
2788      * received from our master. */
2789     if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
2790         /* Duplicate relevant flags in the module client. */
2791         c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
2792         c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
2793         if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) !=
2794                            server.cluster->myself)
2795         {
2796             errno = EPERM;
2797             goto cleanup;
2798         }
2799     }
2800 
2801     /* If we are using single commands replication, we need to wrap what
2802      * we propagate into a MULTI/EXEC block, so that it will be atomic like
2803      * a Lua script in the context of AOF and slaves. */
2804     if (replicate) moduleReplicateMultiIfNeeded(ctx);
2805 
2806     /* Run the command */
2807     int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
2808     if (replicate) {
2809         call_flags |= CMD_CALL_PROPAGATE_AOF;
2810         call_flags |= CMD_CALL_PROPAGATE_REPL;
2811     }
2812     call(c,call_flags);
2813 
2814     /* Convert the result of the Redis command into a suitable Lua type.
2815      * The first thing we need is to create a single string from the client
2816      * output buffers. */
2817     sds proto = sdsnewlen(c->buf,c->bufpos);
2818     c->bufpos = 0;
2819     while(listLength(c->reply)) {
2820         clientReplyBlock *o = listNodeValue(listFirst(c->reply));
2821 
2822         proto = sdscatlen(proto,o->buf,o->used);
2823         listDelNode(c->reply,listFirst(c->reply));
2824     }
2825     reply = moduleCreateCallReplyFromProto(ctx,proto);
2826     autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
2827 
2828 cleanup:
2829     if (ctx->module) ctx->module->in_call--;
2830     freeClient(c);
2831     return reply;
2832 }
2833 
2834 /* Return a pointer, and a length, to the protocol returned by the command
2835  * that returned the reply object. */
RM_CallReplyProto(RedisModuleCallReply * reply,size_t * len)2836 const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) {
2837     if (reply->proto) *len = sdslen(reply->proto);
2838     return reply->proto;
2839 }
2840 
2841 /* --------------------------------------------------------------------------
2842  * Modules data types
2843  *
2844  * When String DMA or using existing data structures is not enough, it is
2845  * possible to create new data types from scratch and export them to
2846  * Redis. The module must provide a set of callbacks for handling the
2847  * new values exported (for example in order to provide RDB saving/loading,
2848  * AOF rewrite, and so forth). In this section we define this API.
2849  * -------------------------------------------------------------------------- */
2850 
2851 /* Turn a 9 chars name in the specified charset and a 10 bit encver into
2852  * a single 64 bit unsigned integer that represents this exact module name
2853  * and version. This final number is called a "type ID" and is used when
2854  * writing module exported values to RDB files, in order to re-associate the
2855  * value to the right module to load them during RDB loading.
2856  *
2857  * If the string is not of the right length or the charset is wrong, or
2858  * if encver is outside the unsigned 10 bit integer range, 0 is returned,
2859  * otherwise the function returns the right type ID.
2860  *
2861  * The resulting 64 bit integer is composed as follows:
2862  *
2863  *     (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits)
2864  *
2865  * The first 6 bits value is the first character, name[0], while the last
2866  * 6 bits value, immediately before the 10 bits integer, is name[8].
2867  * The last 10 bits are the encoding version.
2868  *
2869  * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce
2870  * zero as return value, that is the same we use to signal errors, thus
2871  * this combination is invalid, and also useless since type names should
2872  * try to be vary to avoid collisions. */
2873 
2874 const char *ModuleTypeNameCharSet =
2875              "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
2876              "abcdefghijklmnopqrstuvwxyz"
2877              "0123456789-_";
2878 
moduleTypeEncodeId(const char * name,int encver)2879 uint64_t moduleTypeEncodeId(const char *name, int encver) {
2880     /* We use 64 symbols so that we can map each character into 6 bits
2881      * of the final output. */
2882     const char *cset = ModuleTypeNameCharSet;
2883     if (strlen(name) != 9) return 0;
2884     if (encver < 0 || encver > 1023) return 0;
2885 
2886     uint64_t id = 0;
2887     for (int j = 0; j < 9; j++) {
2888         char *p = strchr(cset,name[j]);
2889         if (!p) return 0;
2890         unsigned long pos = p-cset;
2891         id = (id << 6) | pos;
2892     }
2893     id = (id << 10) | encver;
2894     return id;
2895 }
2896 
2897 /* Search, in the list of exported data types of all the modules registered,
2898  * a type with the same name as the one given. Returns the moduleType
2899  * structure pointer if such a module is found, or NULL otherwise. */
moduleTypeLookupModuleByName(const char * name)2900 moduleType *moduleTypeLookupModuleByName(const char *name) {
2901     dictIterator *di = dictGetIterator(modules);
2902     dictEntry *de;
2903 
2904     while ((de = dictNext(di)) != NULL) {
2905         struct RedisModule *module = dictGetVal(de);
2906         listIter li;
2907         listNode *ln;
2908 
2909         listRewind(module->types,&li);
2910         while((ln = listNext(&li))) {
2911             moduleType *mt = ln->value;
2912             if (memcmp(name,mt->name,sizeof(mt->name)) == 0) {
2913                 dictReleaseIterator(di);
2914                 return mt;
2915             }
2916         }
2917     }
2918     dictReleaseIterator(di);
2919     return NULL;
2920 }
2921 
2922 /* Lookup a module by ID, with caching. This function is used during RDB
2923  * loading. Modules exporting data types should never be able to unload, so
2924  * our cache does not need to expire. */
2925 #define MODULE_LOOKUP_CACHE_SIZE 3
2926 
moduleTypeLookupModuleByID(uint64_t id)2927 moduleType *moduleTypeLookupModuleByID(uint64_t id) {
2928     static struct {
2929         uint64_t id;
2930         moduleType *mt;
2931     } cache[MODULE_LOOKUP_CACHE_SIZE];
2932 
2933     /* Search in cache to start. */
2934     int j;
2935     for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE && cache[j].mt != NULL; j++)
2936         if (cache[j].id == id) return cache[j].mt;
2937 
2938     /* Slow module by module lookup. */
2939     moduleType *mt = NULL;
2940     dictIterator *di = dictGetIterator(modules);
2941     dictEntry *de;
2942 
2943     while ((de = dictNext(di)) != NULL && mt == NULL) {
2944         struct RedisModule *module = dictGetVal(de);
2945         listIter li;
2946         listNode *ln;
2947 
2948         listRewind(module->types,&li);
2949         while((ln = listNext(&li))) {
2950             moduleType *this_mt = ln->value;
2951             /* Compare only the 54 bit module identifier and not the
2952              * encoding version. */
2953             if (this_mt->id >> 10 == id >> 10) {
2954                 mt = this_mt;
2955                 break;
2956             }
2957         }
2958     }
2959     dictReleaseIterator(di);
2960 
2961     /* Add to cache if possible. */
2962     if (mt && j < MODULE_LOOKUP_CACHE_SIZE) {
2963         cache[j].id = id;
2964         cache[j].mt = mt;
2965     }
2966     return mt;
2967 }
2968 
2969 /* Turn an (unresolved) module ID into a type name, to show the user an
2970  * error when RDB files contain module data we can't load.
2971  * The buffer pointed by 'name' must be 10 bytes at least. The function will
2972  * fill it with a null terminated module name. */
moduleTypeNameByID(char * name,uint64_t moduleid)2973 void moduleTypeNameByID(char *name, uint64_t moduleid) {
2974     const char *cset = ModuleTypeNameCharSet;
2975 
2976     name[9] = '\0';
2977     char *p = name+8;
2978     moduleid >>= 10;
2979     for (int j = 0; j < 9; j++) {
2980         *p-- = cset[moduleid & 63];
2981         moduleid >>= 6;
2982     }
2983 }
2984 
2985 /* Register a new data type exported by the module. The parameters are the
2986  * following. Please for in depth documentation check the modules API
2987  * documentation, especially the TYPES.md file.
2988  *
2989  * * **name**: A 9 characters data type name that MUST be unique in the Redis
2990  *   Modules ecosystem. Be creative... and there will be no collisions. Use
2991  *   the charset A-Z a-z 9-0, plus the two "-_" characters. A good
2992  *   idea is to use, for example `<typename>-<vendor>`. For example
2993  *   "tree-AntZ" may mean "Tree data structure by @antirez". To use both
2994  *   lower case and upper case letters helps in order to prevent collisions.
2995  * * **encver**: Encoding version, which is, the version of the serialization
2996  *   that a module used in order to persist data. As long as the "name"
2997  *   matches, the RDB loading will be dispatched to the type callbacks
2998  *   whatever 'encver' is used, however the module can understand if
2999  *   the encoding it must load are of an older version of the module.
3000  *   For example the module "tree-AntZ" initially used encver=0. Later
3001  *   after an upgrade, it started to serialize data in a different format
3002  *   and to register the type with encver=1. However this module may
3003  *   still load old data produced by an older version if the rdb_load
3004  *   callback is able to check the encver value and act accordingly.
3005  *   The encver must be a positive value between 0 and 1023.
3006  * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
3007  *   that should be populated with the methods callbacks and structure
3008  *   version, like in the following example:
3009  *
3010  *      RedisModuleTypeMethods tm = {
3011  *          .version = REDISMODULE_TYPE_METHOD_VERSION,
3012  *          .rdb_load = myType_RDBLoadCallBack,
3013  *          .rdb_save = myType_RDBSaveCallBack,
3014  *          .aof_rewrite = myType_AOFRewriteCallBack,
3015  *          .free = myType_FreeCallBack,
3016  *
3017  *          // Optional fields
3018  *          .digest = myType_DigestCallBack,
3019  *          .mem_usage = myType_MemUsageCallBack,
3020  *      }
3021  *
3022  * * **rdb_load**: A callback function pointer that loads data from RDB files.
3023  * * **rdb_save**: A callback function pointer that saves data to RDB files.
3024  * * **aof_rewrite**: A callback function pointer that rewrites data as commands.
3025  * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
3026  * * **free**: A callback function pointer that can free a type value.
3027  *
3028  * The **digest* and **mem_usage** methods should currently be omitted since
3029  * they are not yet implemented inside the Redis modules core.
3030  *
3031  * Note: the module name "AAAAAAAAA" is reserved and produces an error, it
3032  * happens to be pretty lame as well.
3033  *
3034  * If there is already a module registering a type with the same name,
3035  * and if the module name or encver is invalid, NULL is returned.
3036  * Otherwise the new type is registered into Redis, and a reference of
3037  * type RedisModuleType is returned: the caller of the function should store
3038  * this reference into a gobal variable to make future use of it in the
3039  * modules type API, since a single module may register multiple types.
3040  * Example code fragment:
3041  *
3042  *      static RedisModuleType *BalancedTreeType;
3043  *
3044  *      int RedisModule_OnLoad(RedisModuleCtx *ctx) {
3045  *          // some code here ...
3046  *          BalancedTreeType = RM_CreateDataType(...);
3047  *      }
3048  */
RM_CreateDataType(RedisModuleCtx * ctx,const char * name,int encver,void * typemethods_ptr)3049 moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) {
3050     uint64_t id = moduleTypeEncodeId(name,encver);
3051     if (id == 0) return NULL;
3052     if (moduleTypeLookupModuleByName(name) != NULL) return NULL;
3053 
3054     long typemethods_version = ((long*)typemethods_ptr)[0];
3055     if (typemethods_version == 0) return NULL;
3056 
3057     struct typemethods {
3058         uint64_t version;
3059         moduleTypeLoadFunc rdb_load;
3060         moduleTypeSaveFunc rdb_save;
3061         moduleTypeRewriteFunc aof_rewrite;
3062         moduleTypeMemUsageFunc mem_usage;
3063         moduleTypeDigestFunc digest;
3064         moduleTypeFreeFunc free;
3065     } *tms = (struct typemethods*) typemethods_ptr;
3066 
3067     moduleType *mt = zcalloc(sizeof(*mt));
3068     mt->id = id;
3069     mt->module = ctx->module;
3070     mt->rdb_load = tms->rdb_load;
3071     mt->rdb_save = tms->rdb_save;
3072     mt->aof_rewrite = tms->aof_rewrite;
3073     mt->mem_usage = tms->mem_usage;
3074     mt->digest = tms->digest;
3075     mt->free = tms->free;
3076     memcpy(mt->name,name,sizeof(mt->name));
3077     listAddNodeTail(ctx->module->types,mt);
3078     return mt;
3079 }
3080 
3081 /* If the key is open for writing, set the specified module type object
3082  * as the value of the key, deleting the old value if any.
3083  * On success REDISMODULE_OK is returned. If the key is not open for
3084  * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_ModuleTypeSetValue(RedisModuleKey * key,moduleType * mt,void * value)3085 int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
3086     if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
3087     RM_DeleteKey(key);
3088     robj *o = createModuleObject(mt,value);
3089     setKey(key->db,key->key,o);
3090     decrRefCount(o);
3091     key->value = o;
3092     return REDISMODULE_OK;
3093 }
3094 
3095 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3096  * the key, returns the module type pointer of the value stored at key.
3097  *
3098  * If the key is NULL, is not associated with a module type, or is empty,
3099  * then NULL is returned instead. */
RM_ModuleTypeGetType(RedisModuleKey * key)3100 moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) {
3101     if (key == NULL ||
3102         key->value == NULL ||
3103         RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3104     moduleValue *mv = key->value->ptr;
3105     return mv->type;
3106 }
3107 
3108 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3109  * the key, returns the module type low-level value stored at key, as
3110  * it was set by the user via RedisModule_ModuleTypeSet().
3111  *
3112  * If the key is NULL, is not associated with a module type, or is empty,
3113  * then NULL is returned instead. */
RM_ModuleTypeGetValue(RedisModuleKey * key)3114 void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
3115     if (key == NULL ||
3116         key->value == NULL ||
3117         RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3118     moduleValue *mv = key->value->ptr;
3119     return mv->value;
3120 }
3121 
3122 /* --------------------------------------------------------------------------
3123  * RDB loading and saving functions
3124  * -------------------------------------------------------------------------- */
3125 
3126 /* Called when there is a load error in the context of a module. This cannot
3127  * be recovered like for the built-in types. */
moduleRDBLoadError(RedisModuleIO * io)3128 void moduleRDBLoadError(RedisModuleIO *io) {
3129     serverLog(LL_WARNING,
3130         "Error loading data from RDB (short read or EOF). "
3131         "Read performed by module '%s' about type '%s' "
3132         "after reading '%llu' bytes of a value.",
3133         io->type->module->name,
3134         io->type->name,
3135         (unsigned long long)io->bytes);
3136     exit(1);
3137 }
3138 
3139 /* Save an unsigned 64 bit value into the RDB file. This function should only
3140  * be called in the context of the rdb_save method of modules implementing new
3141  * data types. */
RM_SaveUnsigned(RedisModuleIO * io,uint64_t value)3142 void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
3143     if (io->error) return;
3144     /* Save opcode. */
3145     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT);
3146     if (retval == -1) goto saveerr;
3147     io->bytes += retval;
3148     /* Save value. */
3149     retval = rdbSaveLen(io->rio, value);
3150     if (retval == -1) goto saveerr;
3151     io->bytes += retval;
3152     return;
3153 
3154 saveerr:
3155     io->error = 1;
3156 }
3157 
3158 /* Load an unsigned 64 bit value from the RDB file. This function should only
3159  * be called in the context of the rdb_load method of modules implementing
3160  * new data types. */
RM_LoadUnsigned(RedisModuleIO * io)3161 uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
3162     if (io->ver == 2) {
3163         uint64_t opcode = rdbLoadLen(io->rio,NULL);
3164         if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
3165     }
3166     uint64_t value;
3167     int retval = rdbLoadLenByRef(io->rio, NULL, &value);
3168     if (retval == -1) goto loaderr;
3169     return value;
3170 
3171 loaderr:
3172     moduleRDBLoadError(io);
3173     return 0; /* Never reached. */
3174 }
3175 
3176 /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
RM_SaveSigned(RedisModuleIO * io,int64_t value)3177 void RM_SaveSigned(RedisModuleIO *io, int64_t value) {
3178     union {uint64_t u; int64_t i;} conv;
3179     conv.i = value;
3180     RM_SaveUnsigned(io,conv.u);
3181 }
3182 
3183 /* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */
RM_LoadSigned(RedisModuleIO * io)3184 int64_t RM_LoadSigned(RedisModuleIO *io) {
3185     union {uint64_t u; int64_t i;} conv;
3186     conv.u = RM_LoadUnsigned(io);
3187     return conv.i;
3188 }
3189 
3190 /* In the context of the rdb_save method of a module type, saves a
3191  * string into the RDB file taking as input a RedisModuleString.
3192  *
3193  * The string can be later loaded with RedisModule_LoadString() or
3194  * other Load family functions expecting a serialized string inside
3195  * the RDB file. */
RM_SaveString(RedisModuleIO * io,RedisModuleString * s)3196 void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
3197     if (io->error) return;
3198     /* Save opcode. */
3199     ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3200     if (retval == -1) goto saveerr;
3201     io->bytes += retval;
3202     /* Save value. */
3203     retval = rdbSaveStringObject(io->rio, s);
3204     if (retval == -1) goto saveerr;
3205     io->bytes += retval;
3206     return;
3207 
3208 saveerr:
3209     io->error = 1;
3210 }
3211 
3212 /* Like RedisModule_SaveString() but takes a raw C pointer and length
3213  * as input. */
RM_SaveStringBuffer(RedisModuleIO * io,const char * str,size_t len)3214 void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
3215     if (io->error) return;
3216     /* Save opcode. */
3217     ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3218     if (retval == -1) goto saveerr;
3219     io->bytes += retval;
3220     /* Save value. */
3221     retval = rdbSaveRawString(io->rio, (unsigned char*)str,len);
3222     if (retval == -1) goto saveerr;
3223     io->bytes += retval;
3224     return;
3225 
3226 saveerr:
3227     io->error = 1;
3228 }
3229 
3230 /* Implements RM_LoadString() and RM_LoadStringBuffer() */
moduleLoadString(RedisModuleIO * io,int plain,size_t * lenptr)3231 void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
3232     if (io->ver == 2) {
3233         uint64_t opcode = rdbLoadLen(io->rio,NULL);
3234         if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
3235     }
3236     void *s = rdbGenericLoadStringObject(io->rio,
3237               plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
3238     if (s == NULL) goto loaderr;
3239     return s;
3240 
3241 loaderr:
3242     moduleRDBLoadError(io);
3243     return NULL; /* Never reached. */
3244 }
3245 
3246 /* In the context of the rdb_load method of a module data type, loads a string
3247  * from the RDB file, that was previously saved with RedisModule_SaveString()
3248  * functions family.
3249  *
3250  * The returned string is a newly allocated RedisModuleString object, and
3251  * the user should at some point free it with a call to RedisModule_FreeString().
3252  *
3253  * If the data structure does not store strings as RedisModuleString objects,
3254  * the similar function RedisModule_LoadStringBuffer() could be used instead. */
RM_LoadString(RedisModuleIO * io)3255 RedisModuleString *RM_LoadString(RedisModuleIO *io) {
3256     return moduleLoadString(io,0,NULL);
3257 }
3258 
3259 /* Like RedisModule_LoadString() but returns an heap allocated string that
3260  * was allocated with RedisModule_Alloc(), and can be resized or freed with
3261  * RedisModule_Realloc() or RedisModule_Free().
3262  *
3263  * The size of the string is stored at '*lenptr' if not NULL.
3264  * The returned string is not automatically NULL termianted, it is loaded
3265  * exactly as it was stored inisde the RDB file. */
RM_LoadStringBuffer(RedisModuleIO * io,size_t * lenptr)3266 char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
3267     return moduleLoadString(io,1,lenptr);
3268 }
3269 
3270 /* In the context of the rdb_save method of a module data type, saves a double
3271  * value to the RDB file. The double can be a valid number, a NaN or infinity.
3272  * It is possible to load back the value with RedisModule_LoadDouble(). */
RM_SaveDouble(RedisModuleIO * io,double value)3273 void RM_SaveDouble(RedisModuleIO *io, double value) {
3274     if (io->error) return;
3275     /* Save opcode. */
3276     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE);
3277     if (retval == -1) goto saveerr;
3278     io->bytes += retval;
3279     /* Save value. */
3280     retval = rdbSaveBinaryDoubleValue(io->rio, value);
3281     if (retval == -1) goto saveerr;
3282     io->bytes += retval;
3283     return;
3284 
3285 saveerr:
3286     io->error = 1;
3287 }
3288 
3289 /* In the context of the rdb_save method of a module data type, loads back the
3290  * double value saved by RedisModule_SaveDouble(). */
RM_LoadDouble(RedisModuleIO * io)3291 double RM_LoadDouble(RedisModuleIO *io) {
3292     if (io->ver == 2) {
3293         uint64_t opcode = rdbLoadLen(io->rio,NULL);
3294         if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
3295     }
3296     double value;
3297     int retval = rdbLoadBinaryDoubleValue(io->rio, &value);
3298     if (retval == -1) goto loaderr;
3299     return value;
3300 
3301 loaderr:
3302     moduleRDBLoadError(io);
3303     return 0; /* Never reached. */
3304 }
3305 
3306 /* In the context of the rdb_save method of a module data type, saves a float
3307  * value to the RDB file. The float can be a valid number, a NaN or infinity.
3308  * It is possible to load back the value with RedisModule_LoadFloat(). */
RM_SaveFloat(RedisModuleIO * io,float value)3309 void RM_SaveFloat(RedisModuleIO *io, float value) {
3310     if (io->error) return;
3311     /* Save opcode. */
3312     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT);
3313     if (retval == -1) goto saveerr;
3314     io->bytes += retval;
3315     /* Save value. */
3316     retval = rdbSaveBinaryFloatValue(io->rio, value);
3317     if (retval == -1) goto saveerr;
3318     io->bytes += retval;
3319     return;
3320 
3321 saveerr:
3322     io->error = 1;
3323 }
3324 
3325 /* In the context of the rdb_save method of a module data type, loads back the
3326  * float value saved by RedisModule_SaveFloat(). */
RM_LoadFloat(RedisModuleIO * io)3327 float RM_LoadFloat(RedisModuleIO *io) {
3328     if (io->ver == 2) {
3329         uint64_t opcode = rdbLoadLen(io->rio,NULL);
3330         if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
3331     }
3332     float value;
3333     int retval = rdbLoadBinaryFloatValue(io->rio, &value);
3334     if (retval == -1) goto loaderr;
3335     return value;
3336 
3337 loaderr:
3338     moduleRDBLoadError(io);
3339     return 0; /* Never reached. */
3340 }
3341 
3342 /* --------------------------------------------------------------------------
3343  * Key digest API (DEBUG DIGEST interface for modules types)
3344  * -------------------------------------------------------------------------- */
3345 
3346 /* Add a new element to the digest. This function can be called multiple times
3347  * one element after the other, for all the elements that constitute a given
3348  * data structure. The function call must be followed by the call to
3349  * `RedisModule_DigestEndSequence` eventually, when all the elements that are
3350  * always in a given order are added. See the Redis Modules data types
3351  * documentation for more info. However this is a quick example that uses Redis
3352  * data types as an example.
3353  *
3354  * To add a sequence of unordered elements (for example in the case of a Redis
3355  * Set), the pattern to use is:
3356  *
3357  *     foreach element {
3358  *         AddElement(element);
3359  *         EndSequence();
3360  *     }
3361  *
3362  * Because Sets are not ordered, so every element added has a position that
3363  * does not depend from the other. However if instead our elements are
3364  * ordered in pairs, like field-value pairs of an Hash, then one should
3365  * use:
3366  *
3367  *     foreach key,value {
3368  *         AddElement(key);
3369  *         AddElement(value);
3370  *         EndSquence();
3371  *     }
3372  *
3373  * Because the key and value will be always in the above order, while instead
3374  * the single key-value pairs, can appear in any position into a Redis hash.
3375  *
3376  * A list of ordered elements would be implemented with:
3377  *
3378  *     foreach element {
3379  *         AddElement(element);
3380  *     }
3381  *     EndSequence();
3382  *
3383  */
RM_DigestAddStringBuffer(RedisModuleDigest * md,unsigned char * ele,size_t len)3384 void RM_DigestAddStringBuffer(RedisModuleDigest *md, unsigned char *ele, size_t len) {
3385     mixDigest(md->o,ele,len);
3386 }
3387 
3388 /* Like `RedisModule_DigestAddStringBuffer()` but takes a long long as input
3389  * that gets converted into a string before adding it to the digest. */
RM_DigestAddLongLong(RedisModuleDigest * md,long long ll)3390 void RM_DigestAddLongLong(RedisModuleDigest *md, long long ll) {
3391     char buf[LONG_STR_SIZE];
3392     size_t len = ll2string(buf,sizeof(buf),ll);
3393     mixDigest(md->o,buf,len);
3394 }
3395 
3396 /* See the documentation for `RedisModule_DigestAddElement()`. */
RM_DigestEndSequence(RedisModuleDigest * md)3397 void RM_DigestEndSequence(RedisModuleDigest *md) {
3398     xorDigest(md->x,md->o,sizeof(md->o));
3399     memset(md->o,0,sizeof(md->o));
3400 }
3401 
3402 /* --------------------------------------------------------------------------
3403  * AOF API for modules data types
3404  * -------------------------------------------------------------------------- */
3405 
3406 /* Emits a command into the AOF during the AOF rewriting process. This function
3407  * is only called in the context of the aof_rewrite method of data types exported
3408  * by a module. The command works exactly like RedisModule_Call() in the way
3409  * the parameters are passed, but it does not return anything as the error
3410  * handling is performed by Redis itself. */
RM_EmitAOF(RedisModuleIO * io,const char * cmdname,const char * fmt,...)3411 void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
3412     if (io->error) return;
3413     struct redisCommand *cmd;
3414     robj **argv = NULL;
3415     int argc = 0, flags = 0, j;
3416     va_list ap;
3417 
3418     cmd = lookupCommandByCString((char*)cmdname);
3419     if (!cmd) {
3420         serverLog(LL_WARNING,
3421             "Fatal: AOF method for module data type '%s' tried to "
3422             "emit unknown command '%s'",
3423             io->type->name, cmdname);
3424         io->error = 1;
3425         errno = EINVAL;
3426         return;
3427     }
3428 
3429     /* Emit the arguments into the AOF in Redis protocol format. */
3430     va_start(ap, fmt);
3431     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
3432     va_end(ap);
3433     if (argv == NULL) {
3434         serverLog(LL_WARNING,
3435             "Fatal: AOF method for module data type '%s' tried to "
3436             "call RedisModule_EmitAOF() with wrong format specifiers '%s'",
3437             io->type->name, fmt);
3438         io->error = 1;
3439         errno = EINVAL;
3440         return;
3441     }
3442 
3443     /* Bulk count. */
3444     if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0)
3445         io->error = 1;
3446 
3447     /* Arguments. */
3448     for (j = 0; j < argc; j++) {
3449         if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0)
3450             io->error = 1;
3451         decrRefCount(argv[j]);
3452     }
3453     zfree(argv);
3454     return;
3455 }
3456 
3457 /* --------------------------------------------------------------------------
3458  * IO context handling
3459  * -------------------------------------------------------------------------- */
3460 
RM_GetContextFromIO(RedisModuleIO * io)3461 RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
3462     if (io->ctx) return io->ctx; /* Can't have more than one... */
3463     RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
3464     io->ctx = zmalloc(sizeof(RedisModuleCtx));
3465     *(io->ctx) = ctxtemplate;
3466     io->ctx->module = io->type->module;
3467     io->ctx->client = NULL;
3468     return io->ctx;
3469 }
3470 
3471 /* Returns a RedisModuleString with the name of the key currently saving or
3472  * loading, when an IO data type callback is called.  There is no guarantee
3473  * that the key name is always available, so this may return NULL.
3474  */
RM_GetKeyNameFromIO(RedisModuleIO * io)3475 const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
3476     return io->key;
3477 }
3478 
3479 /* --------------------------------------------------------------------------
3480  * Logging
3481  * -------------------------------------------------------------------------- */
3482 
3483 /* This is the low level function implementing both:
3484  *
3485  *      RM_Log()
3486  *      RM_LogIOError()
3487  *
3488  */
RM_LogRaw(RedisModule * module,const char * levelstr,const char * fmt,va_list ap)3489 void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) {
3490     char msg[LOG_MAX_LEN];
3491     size_t name_len;
3492     int level;
3493 
3494     if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
3495     else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
3496     else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
3497     else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
3498     else level = LL_VERBOSE; /* Default. */
3499 
3500     if (level < server.verbosity) return;
3501 
3502     name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
3503     vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
3504     serverLogRaw(level,msg);
3505 }
3506 
3507 /* Produces a log message to the standard Redis log, the format accepts
3508  * printf-alike specifiers, while level is a string describing the log
3509  * level to use when emitting the log, and must be one of the following:
3510  *
3511  * * "debug"
3512  * * "verbose"
3513  * * "notice"
3514  * * "warning"
3515  *
3516  * If the specified log level is invalid, verbose is used by default.
3517  * There is a fixed limit to the length of the log line this function is able
3518  * to emit, this limit is not specified but is guaranteed to be more than
3519  * a few lines of text.
3520  */
RM_Log(RedisModuleCtx * ctx,const char * levelstr,const char * fmt,...)3521 void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
3522     if (!ctx->module) return;   /* Can only log if module is initialized */
3523 
3524     va_list ap;
3525     va_start(ap, fmt);
3526     RM_LogRaw(ctx->module,levelstr,fmt,ap);
3527     va_end(ap);
3528 }
3529 
3530 /* Log errors from RDB / AOF serialization callbacks.
3531  *
3532  * This function should be used when a callback is returning a critical
3533  * error to the caller since cannot load or save the data for some
3534  * critical reason. */
RM_LogIOError(RedisModuleIO * io,const char * levelstr,const char * fmt,...)3535 void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) {
3536     va_list ap;
3537     va_start(ap, fmt);
3538     RM_LogRaw(io->type->module,levelstr,fmt,ap);
3539     va_end(ap);
3540 }
3541 
3542 /* --------------------------------------------------------------------------
3543  * Blocking clients from modules
3544  * -------------------------------------------------------------------------- */
3545 
3546 /* Readable handler for the awake pipe. We do nothing here, the awake bytes
3547  * will be actually read in a more appropriate place in the
3548  * moduleHandleBlockedClients() function that is where clients are actually
3549  * served. */
moduleBlockedClientPipeReadable(aeEventLoop * el,int fd,void * privdata,int mask)3550 void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
3551     UNUSED(el);
3552     UNUSED(fd);
3553     UNUSED(mask);
3554     UNUSED(privdata);
3555 }
3556 
3557 /* This is called from blocked.c in order to unblock a client: may be called
3558  * for multiple reasons while the client is in the middle of being blocked
3559  * because the client is terminated, but is also called for cleanup when a
3560  * client is unblocked in a clean way after replaying.
3561  *
3562  * What we do here is just to set the client to NULL in the redis module
3563  * blocked client handle. This way if the client is terminated while there
3564  * is a pending threaded operation involving the blocked client, we'll know
3565  * that the client no longer exists and no reply callback should be called.
3566  *
3567  * The structure RedisModuleBlockedClient will be always deallocated when
3568  * running the list of clients blocked by a module that need to be unblocked. */
unblockClientFromModule(client * c)3569 void unblockClientFromModule(client *c) {
3570     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3571 
3572     /* Call the disconnection callback if any. */
3573     if (bc->disconnect_callback) {
3574         RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3575         ctx.blocked_privdata = bc->privdata;
3576         ctx.module = bc->module;
3577         ctx.client = bc->client;
3578         bc->disconnect_callback(&ctx,bc);
3579         moduleFreeContext(&ctx);
3580     }
3581 
3582     bc->client = NULL;
3583     /* Reset the client for a new query since, for blocking commands implemented
3584      * into modules, we do not it immediately after the command returns (and
3585      * the client blocks) in order to be still able to access the argument
3586      * vector from callbacks. */
3587     resetClient(c);
3588 }
3589 
3590 /* Block a client in the context of a blocking command, returning an handle
3591  * which will be used, later, in order to unblock the client with a call to
3592  * RedisModule_UnblockClient(). The arguments specify callback functions
3593  * and a timeout after which the client is unblocked.
3594  *
3595  * The callbacks are called in the following contexts:
3596  *
3597  *     reply_callback:  called after a successful RedisModule_UnblockClient()
3598  *                      call in order to reply to the client and unblock it.
3599  *
3600  *     reply_timeout:   called when the timeout is reached in order to send an
3601  *                      error to the client.
3602  *
3603  *     free_privdata:   called in order to free the private data that is passed
3604  *                      by RedisModule_UnblockClient() call.
3605  */
RM_BlockClient(RedisModuleCtx * ctx,RedisModuleCmdFunc reply_callback,RedisModuleCmdFunc timeout_callback,void (* free_privdata)(RedisModuleCtx *,void *),long long timeout_ms)3606 RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
3607     client *c = ctx->client;
3608     int islua = c->flags & CLIENT_LUA;
3609     int ismulti = c->flags & CLIENT_MULTI;
3610 
3611     c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
3612     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3613 
3614     /* We need to handle the invalid operation of calling modules blocking
3615      * commands from Lua or MULTI. We actually create an already aborted
3616      * (client set to NULL) blocked client handle, and actually reply with
3617      * an error. */
3618     bc->client = (islua || ismulti) ? NULL : c;
3619     bc->module = ctx->module;
3620     bc->reply_callback = reply_callback;
3621     bc->timeout_callback = timeout_callback;
3622     bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
3623     bc->free_privdata = free_privdata;
3624     bc->privdata = NULL;
3625     bc->reply_client = createClient(-1);
3626     bc->reply_client->flags |= CLIENT_MODULE;
3627     bc->dbid = c->db->id;
3628     c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
3629 
3630     if (islua || ismulti) {
3631         c->bpop.module_blocked_handle = NULL;
3632         addReplyError(c, islua ?
3633             "Blocking module command called from Lua script" :
3634             "Blocking module command called from transaction");
3635     } else {
3636         blockClient(c,BLOCKED_MODULE);
3637     }
3638     return bc;
3639 }
3640 
3641 /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
3642  * the reply callbacks to be called in order to reply to the client.
3643  * The 'privdata' argument will be accessible by the reply callback, so
3644  * the caller of this function can pass any value that is needed in order to
3645  * actually reply to the client.
3646  *
3647  * A common usage for 'privdata' is a thread that computes something that
3648  * needs to be passed to the client, included but not limited some slow
3649  * to compute reply or some reply obtained via networking.
3650  *
3651  * Note: this function can be called from threads spawned by the module. */
RM_UnblockClient(RedisModuleBlockedClient * bc,void * privdata)3652 int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
3653     pthread_mutex_lock(&moduleUnblockedClientsMutex);
3654     bc->privdata = privdata;
3655     listAddNodeTail(moduleUnblockedClients,bc);
3656     if (write(server.module_blocked_pipe[1],"A",1) != 1) {
3657         /* Ignore the error, this is best-effort. */
3658     }
3659     pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3660     return REDISMODULE_OK;
3661 }
3662 
3663 /* Abort a blocked client blocking operation: the client will be unblocked
3664  * without firing any callback. */
RM_AbortBlock(RedisModuleBlockedClient * bc)3665 int RM_AbortBlock(RedisModuleBlockedClient *bc) {
3666     bc->reply_callback = NULL;
3667     bc->disconnect_callback = NULL;
3668     return RM_UnblockClient(bc,NULL);
3669 }
3670 
3671 /* Set a callback that will be called if a blocked client disconnects
3672  * before the module has a chance to call RedisModule_UnblockClient()
3673  *
3674  * Usually what you want to do there, is to cleanup your module state
3675  * so that you can call RedisModule_UnblockClient() safely, otherwise
3676  * the client will remain blocked forever if the timeout is large.
3677  *
3678  * Notes:
3679  *
3680  * 1. It is not safe to call Reply* family functions here, it is also
3681  *    useless since the client is gone.
3682  *
3683  * 2. This callback is not called if the client disconnects because of
3684  *    a timeout. In such a case, the client is unblocked automatically
3685  *    and the timeout callback is called.
3686  */
RM_SetDisconnectCallback(RedisModuleBlockedClient * bc,RedisModuleDisconnectFunc callback)3687 void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
3688     bc->disconnect_callback = callback;
3689 }
3690 
3691 /* This function will check the moduleUnblockedClients queue in order to
3692  * call the reply callback and really unblock the client.
3693  *
3694  * Clients end into this list because of calls to RM_UnblockClient(),
3695  * however it is possible that while the module was doing work for the
3696  * blocked client, it was terminated by Redis (for timeout or other reasons).
3697  * When this happens the RedisModuleBlockedClient structure in the queue
3698  * will have the 'client' field set to NULL. */
moduleHandleBlockedClients(void)3699 void moduleHandleBlockedClients(void) {
3700     listNode *ln;
3701     RedisModuleBlockedClient *bc;
3702 
3703     pthread_mutex_lock(&moduleUnblockedClientsMutex);
3704     /* Here we unblock all the pending clients blocked in modules operations
3705      * so we can read every pending "awake byte" in the pipe. */
3706     char buf[1];
3707     while (read(server.module_blocked_pipe[0],buf,1) == 1);
3708     while (listLength(moduleUnblockedClients)) {
3709         ln = listFirst(moduleUnblockedClients);
3710         bc = ln->value;
3711         client *c = bc->client;
3712         listDelNode(moduleUnblockedClients,ln);
3713         pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3714 
3715         /* Release the lock during the loop, as long as we don't
3716          * touch the shared list. */
3717 
3718         /* Call the reply callback if the client is valid and we have
3719          * any callback. */
3720         if (c && bc->reply_callback) {
3721             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3722             ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
3723             ctx.blocked_privdata = bc->privdata;
3724             ctx.module = bc->module;
3725             ctx.client = bc->client;
3726             ctx.blocked_client = bc;
3727             bc->reply_callback(&ctx,(void**)c->argv,c->argc);
3728             moduleHandlePropagationAfterCommandCallback(&ctx);
3729             moduleFreeContext(&ctx);
3730         }
3731 
3732         /* Free privdata if any. */
3733         if (bc->privdata && bc->free_privdata) {
3734             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3735             if (c == NULL)
3736                 ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
3737             ctx.blocked_privdata = bc->privdata;
3738             ctx.module = bc->module;
3739             ctx.client = bc->client;
3740             bc->free_privdata(&ctx,bc->privdata);
3741             moduleFreeContext(&ctx);
3742         }
3743 
3744         /* It is possible that this blocked client object accumulated
3745          * replies to send to the client in a thread safe context.
3746          * We need to glue such replies to the client output buffer and
3747          * free the temporary client we just used for the replies. */
3748         if (c) AddReplyFromClient(c, bc->reply_client);
3749         freeClient(bc->reply_client);
3750 
3751         if (c != NULL) {
3752             /* Before unblocking the client, set the disconnect callback
3753              * to NULL, because if we reached this point, the client was
3754              * properly unblocked by the module. */
3755             bc->disconnect_callback = NULL;
3756             unblockClient(c);
3757             /* Put the client in the list of clients that need to write
3758              * if there are pending replies here. This is needed since
3759              * during a non blocking command the client may receive output. */
3760             if (clientHasPendingReplies(c) &&
3761                 !(c->flags & CLIENT_PENDING_WRITE))
3762             {
3763                 c->flags |= CLIENT_PENDING_WRITE;
3764                 listAddNodeHead(server.clients_pending_write,c);
3765             }
3766         }
3767 
3768         /* Free 'bc' only after unblocking the client, since it is
3769          * referenced in the client blocking context, and must be valid
3770          * when calling unblockClient(). */
3771         zfree(bc);
3772 
3773         /* Lock again before to iterate the loop. */
3774         pthread_mutex_lock(&moduleUnblockedClientsMutex);
3775     }
3776     pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3777 }
3778 
3779 /* Called when our client timed out. After this function unblockClient()
3780  * is called, and it will invalidate the blocked client. So this function
3781  * does not need to do any cleanup. Eventually the module will call the
3782  * API to unblock the client and the memory will be released. */
moduleBlockedClientTimedOut(client * c)3783 void moduleBlockedClientTimedOut(client *c) {
3784     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3785     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3786     ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
3787     ctx.module = bc->module;
3788     ctx.client = bc->client;
3789     ctx.blocked_client = bc;
3790     bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
3791     moduleFreeContext(&ctx);
3792     /* For timeout events, we do not want to call the disconnect callback,
3793      * because the blocked client will be automatically disconnected in
3794      * this case, and the user can still hook using the timeout callback. */
3795     bc->disconnect_callback = NULL;
3796 }
3797 
3798 /* Return non-zero if a module command was called in order to fill the
3799  * reply for a blocked client. */
RM_IsBlockedReplyRequest(RedisModuleCtx * ctx)3800 int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
3801     return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
3802 }
3803 
3804 /* Return non-zero if a module command was called in order to fill the
3805  * reply for a blocked client that timed out. */
RM_IsBlockedTimeoutRequest(RedisModuleCtx * ctx)3806 int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
3807     return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
3808 }
3809 
3810 /* Get the private data set by RedisModule_UnblockClient() */
RM_GetBlockedClientPrivateData(RedisModuleCtx * ctx)3811 void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
3812     return ctx->blocked_privdata;
3813 }
3814 
3815 /* Get the blocked client associated with a given context.
3816  * This is useful in the reply and timeout callbacks of blocked clients,
3817  * before sometimes the module has the blocked client handle references
3818  * around, and wants to cleanup it. */
RM_GetBlockedClientHandle(RedisModuleCtx * ctx)3819 RedisModuleBlockedClient *RM_GetBlockedClientHandle(RedisModuleCtx *ctx) {
3820     return ctx->blocked_client;
3821 }
3822 
3823 /* Return true if when the free callback of a blocked client is called,
3824  * the reason for the client to be unblocked is that it disconnected
3825  * while it was blocked. */
RM_BlockedClientDisconnected(RedisModuleCtx * ctx)3826 int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
3827     return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0;
3828 }
3829 
3830 /* --------------------------------------------------------------------------
3831  * Thread Safe Contexts
3832  * -------------------------------------------------------------------------- */
3833 
3834 /* Return a context which can be used inside threads to make Redis context
3835  * calls with certain modules APIs. If 'bc' is not NULL then the module will
3836  * be bound to a blocked client, and it will be possible to use the
3837  * `RedisModule_Reply*` family of functions to accumulate a reply for when the
3838  * client will be unblocked. Otherwise the thread safe context will be
3839  * detached by a specific client.
3840  *
3841  * To call non-reply APIs, the thread safe context must be prepared with:
3842  *
3843  *     RedisModule_ThreadSafeCallStart(ctx);
3844  *     ... make your call here ...
3845  *     RedisModule_ThreadSafeCallStop(ctx);
3846  *
3847  * This is not needed when using `RedisModule_Reply*` functions, assuming
3848  * that a blocked client was used when the context was created, otherwise
3849  * no RedisModule_Reply* call should be made at all.
3850  *
3851  * TODO: thread safe contexts do not inherit the blocked client
3852  * selected database. */
RM_GetThreadSafeContext(RedisModuleBlockedClient * bc)3853 RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
3854     RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
3855     RedisModuleCtx empty = REDISMODULE_CTX_INIT;
3856     memcpy(ctx,&empty,sizeof(empty));
3857     if (bc) {
3858         ctx->blocked_client = bc;
3859         ctx->module = bc->module;
3860     }
3861     ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
3862     /* Even when the context is associated with a blocked client, we can't
3863      * access it safely from another thread, so we create a fake client here
3864      * in order to keep things like the currently selected database and similar
3865      * things. */
3866     ctx->client = createClient(-1);
3867     if (bc) {
3868         selectDb(ctx->client,bc->dbid);
3869         ctx->client->id = bc->client->id;
3870     }
3871     return ctx;
3872 }
3873 
3874 /* Release a thread safe context. */
RM_FreeThreadSafeContext(RedisModuleCtx * ctx)3875 void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
3876     moduleFreeContext(ctx);
3877     zfree(ctx);
3878 }
3879 
3880 /* Acquire the server lock before executing a thread safe API call.
3881  * This is not needed for `RedisModule_Reply*` calls when there is
3882  * a blocked client connected to the thread safe context. */
RM_ThreadSafeContextLock(RedisModuleCtx * ctx)3883 void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
3884     UNUSED(ctx);
3885     moduleAcquireGIL();
3886 }
3887 
3888 /* Release the server lock after a thread safe API call was executed. */
RM_ThreadSafeContextUnlock(RedisModuleCtx * ctx)3889 void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
3890     UNUSED(ctx);
3891     moduleReleaseGIL();
3892 }
3893 
moduleAcquireGIL(void)3894 void moduleAcquireGIL(void) {
3895     pthread_mutex_lock(&moduleGIL);
3896 }
3897 
moduleReleaseGIL(void)3898 void moduleReleaseGIL(void) {
3899     pthread_mutex_unlock(&moduleGIL);
3900 }
3901 
3902 
3903 /* --------------------------------------------------------------------------
3904  * Module Keyspace Notifications API
3905  * -------------------------------------------------------------------------- */
3906 
3907 /* Subscribe to keyspace notifications. This is a low-level version of the
3908  * keyspace-notifications API. A module can register callbacks to be notified
3909  * when keyspce events occur.
3910  *
3911  * Notification events are filtered by their type (string events, set events,
3912  * etc), and the subscriber callback receives only events that match a specific
3913  * mask of event types.
3914  *
3915  * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents
3916  * the module must provide an event type-mask, denoting the events the subscriber
3917  * is interested in. This can be an ORed mask of any of the following flags:
3918  *
3919  *  - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME
3920  *  - REDISMODULE_NOTIFY_STRING: String events
3921  *  - REDISMODULE_NOTIFY_LIST: List events
3922  *  - REDISMODULE_NOTIFY_SET: Set events
3923  *  - REDISMODULE_NOTIFY_HASH: Hash events
3924  *  - REDISMODULE_NOTIFY_ZSET: Sorted Set events
3925  *  - REDISMODULE_NOTIFY_EXPIRED: Expiration events
3926  *  - REDISMODULE_NOTIFY_EVICTED: Eviction events
3927  *  - REDISMODULE_NOTIFY_STREAM: Stream events
3928  *  - REDISMODULE_NOTIFY_ALL: All events
3929  *
3930  * We do not distinguish between key events and keyspace events, and it is up
3931  * to the module to filter the actions taken based on the key.
3932  *
3933  * The subscriber signature is:
3934  *
3935  *   int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
3936  *                                       const char *event,
3937  *                                       RedisModuleString *key);
3938  *
3939  * `type` is the event type bit, that must match the mask given at registration
3940  * time. The event string is the actual command being executed, and key is the
3941  * relevant Redis key.
3942  *
3943  * Notification callback gets executed with a redis context that can not be
3944  * used to send anything to the client, and has the db number where the event
3945  * occurred as its selected db number.
3946  *
3947  * Notice that it is not necessary to enable notifications in redis.conf for
3948  * module notifications to work.
3949  *
3950  * Warning: the notification callbacks are performed in a synchronous manner,
3951  * so notification callbacks must to be fast, or they would slow Redis down.
3952  * If you need to take long actions, use threads to offload them.
3953  *
3954  * See https://redis.io/topics/notifications for more information.
3955  */
RM_SubscribeToKeyspaceEvents(RedisModuleCtx * ctx,int types,RedisModuleNotificationFunc callback)3956 int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
3957     RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
3958     sub->module = ctx->module;
3959     sub->event_mask = types;
3960     sub->notify_callback = callback;
3961     sub->active = 0;
3962 
3963     listAddNodeTail(moduleKeyspaceSubscribers, sub);
3964     return REDISMODULE_OK;
3965 }
3966 
3967 /* Dispatcher for keyspace notifications to module subscriber functions.
3968  * This gets called  only if at least one module requested to be notified on
3969  * keyspace notifications */
moduleNotifyKeyspaceEvent(int type,const char * event,robj * key,int dbid)3970 void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
3971     /* Don't do anything if there aren't any subscribers */
3972     if (listLength(moduleKeyspaceSubscribers) == 0) return;
3973 
3974     listIter li;
3975     listNode *ln;
3976     listRewind(moduleKeyspaceSubscribers,&li);
3977 
3978     /* Remove irrelevant flags from the type mask */
3979     type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
3980 
3981     while((ln = listNext(&li))) {
3982         RedisModuleKeyspaceSubscriber *sub = ln->value;
3983         /* Only notify subscribers on events matching they registration,
3984          * and avoid subscribers triggering themselves */
3985         if ((sub->event_mask & type) && sub->active == 0) {
3986             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3987             ctx.module = sub->module;
3988             ctx.client = moduleFreeContextReusedClient;
3989             selectDb(ctx.client, dbid);
3990 
3991             /* mark the handler as active to avoid reentrant loops.
3992              * If the subscriber performs an action triggering itself,
3993              * it will not be notified about it. */
3994             sub->active = 1;
3995             sub->notify_callback(&ctx, type, event, key);
3996             sub->active = 0;
3997             moduleFreeContext(&ctx);
3998         }
3999     }
4000 }
4001 
4002 /* Unsubscribe any notification subscribers this module has upon unloading */
moduleUnsubscribeNotifications(RedisModule * module)4003 void moduleUnsubscribeNotifications(RedisModule *module) {
4004     listIter li;
4005     listNode *ln;
4006     listRewind(moduleKeyspaceSubscribers,&li);
4007     while((ln = listNext(&li))) {
4008         RedisModuleKeyspaceSubscriber *sub = ln->value;
4009         if (sub->module == module) {
4010             listDelNode(moduleKeyspaceSubscribers, ln);
4011             zfree(sub);
4012         }
4013     }
4014 }
4015 
4016 /* --------------------------------------------------------------------------
4017  * Modules Cluster API
4018  * -------------------------------------------------------------------------- */
4019 
4020 /* The Cluster message callback function pointer type. */
4021 typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
4022 
4023 /* This structure identifies a registered caller: it must match a given module
4024  * ID, for a given message type. The callback function is just the function
4025  * that was registered as receiver. */
4026 typedef struct moduleClusterReceiver {
4027     uint64_t module_id;
4028     RedisModuleClusterMessageReceiver callback;
4029     struct RedisModule *module;
4030     struct moduleClusterReceiver *next;
4031 } moduleClusterReceiver;
4032 
4033 typedef struct moduleClusterNodeInfo {
4034     int flags;
4035     char ip[NET_IP_STR_LEN];
4036     int port;
4037     char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
4038 } mdouleClusterNodeInfo;
4039 
4040 /* We have an array of message types: each bucket is a linked list of
4041  * configured receivers. */
4042 static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
4043 
4044 /* Dispatch the message to the right module receiver. */
moduleCallClusterReceivers(const char * sender_id,uint64_t module_id,uint8_t type,const unsigned char * payload,uint32_t len)4045 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
4046     moduleClusterReceiver *r = clusterReceivers[type];
4047     while(r) {
4048         if (r->module_id == module_id) {
4049             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4050             ctx.module = r->module;
4051             ctx.client = moduleFreeContextReusedClient;
4052             selectDb(ctx.client, 0);
4053             r->callback(&ctx,sender_id,type,payload,len);
4054             moduleFreeContext(&ctx);
4055             return;
4056         }
4057         r = r->next;
4058     }
4059 }
4060 
4061 /* Register a callback receiver for cluster messages of type 'type'. If there
4062  * was already a registered callback, this will replace the callback function
4063  * with the one provided, otherwise if the callback is set to NULL and there
4064  * is already a callback for this function, the callback is unregistered
4065  * (so this API call is also used in order to delete the receiver). */
RM_RegisterClusterMessageReceiver(RedisModuleCtx * ctx,uint8_t type,RedisModuleClusterMessageReceiver callback)4066 void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) {
4067     if (!server.cluster_enabled) return;
4068 
4069     uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4070     moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL;
4071     while(r) {
4072         if (r->module_id == module_id) {
4073             /* Found! Set or delete. */
4074             if (callback) {
4075                 r->callback = callback;
4076             } else {
4077                 /* Delete the receiver entry if the user is setting
4078                  * it to NULL. Just unlink the receiver node from the
4079                  * linked list. */
4080                 if (prev)
4081                     prev->next = r->next;
4082                 else
4083                     clusterReceivers[type]->next = r->next;
4084                 zfree(r);
4085             }
4086             return;
4087         }
4088         prev = r;
4089         r = r->next;
4090     }
4091 
4092     /* Not found, let's add it. */
4093     if (callback) {
4094         r = zmalloc(sizeof(*r));
4095         r->module_id = module_id;
4096         r->module = ctx->module;
4097         r->callback = callback;
4098         r->next = clusterReceivers[type];
4099         clusterReceivers[type] = r;
4100     }
4101 }
4102 
4103 /* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
4104  * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as
4105  * returned by the receiver callback or by the nodes iteration functions.
4106  *
4107  * The function returns REDISMODULE_OK if the message was successfully sent,
4108  * otherwise if the node is not connected or such node ID does not map to any
4109  * known cluster node, REDISMODULE_ERR is returned. */
RM_SendClusterMessage(RedisModuleCtx * ctx,char * target_id,uint8_t type,unsigned char * msg,uint32_t len)4110 int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) {
4111     if (!server.cluster_enabled) return REDISMODULE_ERR;
4112     uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4113     if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK)
4114         return REDISMODULE_OK;
4115     else
4116         return REDISMODULE_ERR;
4117 }
4118 
4119 /* Return an array of string pointers, each string pointer points to a cluster
4120  * node ID of exactly REDISMODULE_NODE_ID_SIZE bytes (without any null term).
4121  * The number of returned node IDs is stored into `*numnodes`.
4122  * However if this function is called by a module not running an a Redis
4123  * instance with Redis Cluster enabled, NULL is returned instead.
4124  *
4125  * The IDs returned can be used with RedisModule_GetClusterNodeInfo() in order
4126  * to get more information about single nodes.
4127  *
4128  * The array returned by this function must be freed using the function
4129  * RedisModule_FreeClusterNodesList().
4130  *
4131  * Example:
4132  *
4133  *     size_t count, j;
4134  *     char **ids = RedisModule_GetClusterNodesList(ctx,&count);
4135  *     for (j = 0; j < count; j++) {
4136  *         RedisModule_Log("notice","Node %.*s",
4137  *             REDISMODULE_NODE_ID_LEN,ids[j]);
4138  *     }
4139  *     RedisModule_FreeClusterNodesList(ids);
4140  */
RM_GetClusterNodesList(RedisModuleCtx * ctx,size_t * numnodes)4141 char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
4142     UNUSED(ctx);
4143 
4144     if (!server.cluster_enabled) return NULL;
4145     size_t count = dictSize(server.cluster->nodes);
4146     char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
4147     dictIterator *di = dictGetIterator(server.cluster->nodes);
4148     dictEntry *de;
4149     int j = 0;
4150     while((de = dictNext(di)) != NULL) {
4151         clusterNode *node = dictGetVal(de);
4152         if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
4153         ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
4154         memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
4155         j++;
4156     }
4157     *numnodes = j;
4158     ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
4159                     * to also get the count argument. */
4160     dictReleaseIterator(di);
4161     return ids;
4162 }
4163 
4164 /* Free the node list obtained with RedisModule_GetClusterNodesList. */
RM_FreeClusterNodesList(char ** ids)4165 void RM_FreeClusterNodesList(char **ids) {
4166     if (ids == NULL) return;
4167     for (int j = 0; ids[j]; j++) zfree(ids[j]);
4168     zfree(ids);
4169 }
4170 
4171 /* Return this node ID (REDISMODULE_CLUSTER_ID_LEN bytes) or NULL if the cluster
4172  * is disabled. */
RM_GetMyClusterID(void)4173 const char *RM_GetMyClusterID(void) {
4174     if (!server.cluster_enabled) return NULL;
4175     return server.cluster->myself->name;
4176 }
4177 
4178 /* Return the number of nodes in the cluster, regardless of their state
4179  * (handshake, noaddress, ...) so that the number of active nodes may actually
4180  * be smaller, but not greater than this number. If the instance is not in
4181  * cluster mode, zero is returned. */
RM_GetClusterSize(void)4182 size_t RM_GetClusterSize(void) {
4183     if (!server.cluster_enabled) return 0;
4184     return dictSize(server.cluster->nodes);
4185 }
4186 
4187 /* Populate the specified info for the node having as ID the specified 'id',
4188  * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from
4189  * the POV of this local node, REDISMODULE_ERR is returned.
4190  *
4191  * The arguments ip, master_id, port and flags can be NULL in case we don't
4192  * need to populate back certain info. If an ip and master_id (only populated
4193  * if the instance is a slave) are specified, they point to buffers holding
4194  * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
4195  * and master_id are not null terminated.
4196  *
4197  * The list of flags reported is the following:
4198  *
4199  * * REDISMODULE_NODE_MYSELF        This node
4200  * * REDISMODULE_NODE_MASTER        The node is a master
4201  * * REDISMODULE_NODE_SLAVE         The node is a replica
4202  * * REDISMODULE_NODE_PFAIL         We see the node as failing
4203  * * REDISMODULE_NODE_FAIL          The cluster agrees the node is failing
4204  * * REDISMODULE_NODE_NOFAILOVER    The slave is configured to never failover
4205  */
4206 
4207 clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
4208 
RM_GetClusterNodeInfo(RedisModuleCtx * ctx,const char * id,char * ip,char * master_id,int * port,int * flags)4209 int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) {
4210     UNUSED(ctx);
4211 
4212     clusterNode *node = clusterLookupNode(id);
4213     if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
4214         return REDISMODULE_ERR;
4215 
4216     if (ip) memcpy(ip,node->name,REDISMODULE_NODE_ID_LEN);
4217 
4218     if (master_id) {
4219         /* If the information is not available, the function will set the
4220          * field to zero bytes, so that when the field can't be populated the
4221          * function kinda remains predictable. */
4222         if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
4223             memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
4224         else
4225             memset(master_id,0,REDISMODULE_NODE_ID_LEN);
4226     }
4227     if (port) *port = node->port;
4228 
4229     /* As usually we have to remap flags for modules, in order to ensure
4230      * we can provide binary compatibility. */
4231     if (flags) {
4232         *flags = 0;
4233         if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
4234         if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
4235         if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
4236         if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
4237         if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
4238         if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
4239     }
4240     return REDISMODULE_OK;
4241 }
4242 
4243 /* Set Redis Cluster flags in order to change the normal behavior of
4244  * Redis Cluster, especially with the goal of disabling certain functions.
4245  * This is useful for modules that use the Cluster API in order to create
4246  * a different distributed system, but still want to use the Redis Cluster
4247  * message bus. Flags that can be set:
4248  *
4249  *  CLUSTER_MODULE_FLAG_NO_FAILOVER
4250  *  CLUSTER_MODULE_FLAG_NO_REDIRECTION
4251  *
4252  * With the following effects:
4253  *
4254  *  NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master.
4255  *               Also disables the replica migration feature.
4256  *
4257  *  NO_REDIRECTION: Every node will accept any key, without trying to perform
4258  *                  partitioning according to the user Redis Cluster algorithm.
4259  *                  Slots informations will still be propagated across the
4260  *                  cluster, but without effects. */
RM_SetClusterFlags(RedisModuleCtx * ctx,uint64_t flags)4261 void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
4262     UNUSED(ctx);
4263     if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER)
4264         server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER;
4265     if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION)
4266         server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION;
4267 }
4268 
4269 /* --------------------------------------------------------------------------
4270  * Modules Timers API
4271  *
4272  * Module timers are an high precision "green timers" abstraction where
4273  * every module can register even millions of timers without problems, even if
4274  * the actual event loop will just have a single timer that is used to awake the
4275  * module timers subsystem in order to process the next event.
4276  *
4277  * All the timers are stored into a radix tree, ordered by expire time, when
4278  * the main Redis event loop timer callback is called, we try to process all
4279  * the timers already expired one after the other. Then we re-enter the event
4280  * loop registering a timer that will expire when the next to process module
4281  * timer will expire.
4282  *
4283  * Every time the list of active timers drops to zero, we unregister the
4284  * main event loop timer, so that there is no overhead when such feature is
4285  * not used.
4286  * -------------------------------------------------------------------------- */
4287 
4288 static rax *Timers;     /* The radix tree of all the timers sorted by expire. */
4289 long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */
4290 
4291 typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
4292 
4293 /* The timer descriptor, stored as value in the radix tree. */
4294 typedef struct RedisModuleTimer {
4295     RedisModule *module;                /* Module reference. */
4296     RedisModuleTimerProc callback;      /* The callback to invoke on expire. */
4297     void *data;                         /* Private data for the callback. */
4298     int dbid;                           /* Database number selected by the original client. */
4299 } RedisModuleTimer;
4300 
4301 /* This is the timer handler that is called by the main event loop. We schedule
4302  * this timer to be called when the nearest of our module timers will expire. */
moduleTimerHandler(struct aeEventLoop * eventLoop,long long id,void * clientData)4303 int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *clientData) {
4304     UNUSED(eventLoop);
4305     UNUSED(id);
4306     UNUSED(clientData);
4307 
4308     /* To start let's try to fire all the timers already expired. */
4309     raxIterator ri;
4310     raxStart(&ri,Timers);
4311     uint64_t now = ustime();
4312     long long next_period = 0;
4313     while(1) {
4314         raxSeek(&ri,"^",NULL,0);
4315         if (!raxNext(&ri)) break;
4316         uint64_t expiretime;
4317         memcpy(&expiretime,ri.key,sizeof(expiretime));
4318         expiretime = ntohu64(expiretime);
4319         if (now >= expiretime) {
4320             RedisModuleTimer *timer = ri.data;
4321             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4322 
4323             ctx.module = timer->module;
4324             ctx.client = moduleFreeContextReusedClient;
4325             selectDb(ctx.client, timer->dbid);
4326             timer->callback(&ctx,timer->data);
4327             moduleFreeContext(&ctx);
4328             raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL);
4329             zfree(timer);
4330         } else {
4331             next_period = (expiretime-now)/1000; /* Scale to milliseconds. */
4332             break;
4333         }
4334     }
4335     raxStop(&ri);
4336 
4337     /* Reschedule the next timer or cancel it. */
4338     if (next_period <= 0) next_period = 1;
4339     return (raxSize(Timers) > 0) ? next_period : AE_NOMORE;
4340 }
4341 
4342 /* Create a new timer that will fire after `period` milliseconds, and will call
4343  * the specified function using `data` as argument. The returned timer ID can be
4344  * used to get information from the timer or to stop it before it fires. */
RM_CreateTimer(RedisModuleCtx * ctx,mstime_t period,RedisModuleTimerProc callback,void * data)4345 RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
4346     RedisModuleTimer *timer = zmalloc(sizeof(*timer));
4347     timer->module = ctx->module;
4348     timer->callback = callback;
4349     timer->data = data;
4350     timer->dbid = ctx->client->db->id;
4351     uint64_t expiretime = ustime()+period*1000;
4352     uint64_t key;
4353 
4354     while(1) {
4355         key = htonu64(expiretime);
4356         if (raxFind(Timers, (unsigned char*)&key,sizeof(key)) == raxNotFound) {
4357             raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL);
4358             break;
4359         } else {
4360             expiretime++;
4361         }
4362     }
4363 
4364     /* We need to install the main event loop timer if it's not already
4365      * installed, or we may need to refresh its period if we just installed
4366      * a timer that will expire sooner than any other else. */
4367     if (aeTimer != -1) {
4368         raxIterator ri;
4369         raxStart(&ri,Timers);
4370         raxSeek(&ri,"^",NULL,0);
4371         raxNext(&ri);
4372         if (memcmp(ri.key,&key,sizeof(key)) == 0) {
4373             /* This is the first key, we need to re-install the timer according
4374              * to the just added event. */
4375             aeDeleteTimeEvent(server.el,aeTimer);
4376             aeTimer = -1;
4377         }
4378         raxStop(&ri);
4379     }
4380 
4381     /* If we have no main timer (the old one was invalidated, or this is the
4382      * first module timer we have), install one. */
4383     if (aeTimer == -1)
4384         aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL);
4385 
4386     return key;
4387 }
4388 
4389 /* Stop a timer, returns REDISMODULE_OK if the timer was found, belonged to the
4390  * calling module, and was stopped, otherwise REDISMODULE_ERR is returned.
4391  * If not NULL, the data pointer is set to the value of the data argument when
4392  * the timer was created. */
RM_StopTimer(RedisModuleCtx * ctx,RedisModuleTimerID id,void ** data)4393 int RM_StopTimer(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data) {
4394     RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4395     if (timer == raxNotFound || timer->module != ctx->module)
4396         return REDISMODULE_ERR;
4397     if (data) *data = timer->data;
4398     raxRemove(Timers,(unsigned char*)&id,sizeof(id),NULL);
4399     zfree(timer);
4400     return REDISMODULE_OK;
4401 }
4402 
4403 /* Obtain information about a timer: its remaining time before firing
4404  * (in milliseconds), and the private data pointer associated with the timer.
4405  * If the timer specified does not exist or belongs to a different module
4406  * no information is returned and the function returns REDISMODULE_ERR, otherwise
4407  * REDISMODULE_OK is returned. The arguments remaining or data can be NULL if
4408  * the caller does not need certain information. */
RM_GetTimerInfo(RedisModuleCtx * ctx,RedisModuleTimerID id,uint64_t * remaining,void ** data)4409 int RM_GetTimerInfo(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data) {
4410     RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4411     if (timer == raxNotFound || timer->module != ctx->module)
4412         return REDISMODULE_ERR;
4413     if (remaining) {
4414         int64_t rem = ntohu64(id)-ustime();
4415         if (rem < 0) rem = 0;
4416         *remaining = rem/1000; /* Scale to milliseconds. */
4417     }
4418     if (data) *data = timer->data;
4419     return REDISMODULE_OK;
4420 }
4421 
4422 /* --------------------------------------------------------------------------
4423  * Modules Dictionary API
4424  *
4425  * Implements a sorted dictionary (actually backed by a radix tree) with
4426  * the usual get / set / del / num-items API, together with an iterator
4427  * capable of going back and forth.
4428  * -------------------------------------------------------------------------- */
4429 
4430 /* Create a new dictionary. The 'ctx' pointer can be the current module context
4431  * or NULL, depending on what you want. Please follow the following rules:
4432  *
4433  * 1. Use a NULL context if you plan to retain a reference to this dictionary
4434  *    that will survive the time of the module callback where you created it.
4435  * 2. Use a NULL context if no context is available at the time you are creating
4436  *    the dictionary (of course...).
4437  * 3. However use the current callback context as 'ctx' argument if the
4438  *    dictionary time to live is just limited to the callback scope. In this
4439  *    case, if enabled, you can enjoy the automatic memory management that will
4440  *    reclaim the dictionary memory, as well as the strings returned by the
4441  *    Next / Prev dictionary iterator calls.
4442  */
RM_CreateDict(RedisModuleCtx * ctx)4443 RedisModuleDict *RM_CreateDict(RedisModuleCtx *ctx) {
4444     struct RedisModuleDict *d = zmalloc(sizeof(*d));
4445     d->rax = raxNew();
4446     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_DICT,d);
4447     return d;
4448 }
4449 
4450 /* Free a dictionary created with RM_CreateDict(). You need to pass the
4451  * context pointer 'ctx' only if the dictionary was created using the
4452  * context instead of passing NULL. */
RM_FreeDict(RedisModuleCtx * ctx,RedisModuleDict * d)4453 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d) {
4454     if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_DICT,d);
4455     raxFree(d->rax);
4456     zfree(d);
4457 }
4458 
4459 /* Return the size of the dictionary (number of keys). */
RM_DictSize(RedisModuleDict * d)4460 uint64_t RM_DictSize(RedisModuleDict *d) {
4461     return raxSize(d->rax);
4462 }
4463 
4464 /* Store the specified key into the dictionary, setting its value to the
4465  * pointer 'ptr'. If the key was added with success, since it did not
4466  * already exist, REDISMODULE_OK is returned. Otherwise if the key already
4467  * exists the function returns REDISMODULE_ERR. */
RM_DictSetC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4468 int RM_DictSetC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4469     int retval = raxTryInsert(d->rax,key,keylen,ptr,NULL);
4470     return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4471 }
4472 
4473 /* Like RedisModule_DictSetC() but will replace the key with the new
4474  * value if the key already exists. */
RM_DictReplaceC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4475 int RM_DictReplaceC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4476     int retval = raxInsert(d->rax,key,keylen,ptr,NULL);
4477     return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4478 }
4479 
4480 /* Like RedisModule_DictSetC() but takes the key as a RedisModuleString. */
RM_DictSet(RedisModuleDict * d,RedisModuleString * key,void * ptr)4481 int RM_DictSet(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4482     return RM_DictSetC(d,key->ptr,sdslen(key->ptr),ptr);
4483 }
4484 
4485 /* Like RedisModule_DictReplaceC() but takes the key as a RedisModuleString. */
RM_DictReplace(RedisModuleDict * d,RedisModuleString * key,void * ptr)4486 int RM_DictReplace(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4487     return RM_DictReplaceC(d,key->ptr,sdslen(key->ptr),ptr);
4488 }
4489 
4490 /* Return the value stored at the specified key. The function returns NULL
4491  * both in the case the key does not exist, or if you actually stored
4492  * NULL at key. So, optionally, if the 'nokey' pointer is not NULL, it will
4493  * be set by reference to 1 if the key does not exist, or to 0 if the key
4494  * exists. */
RM_DictGetC(RedisModuleDict * d,void * key,size_t keylen,int * nokey)4495 void *RM_DictGetC(RedisModuleDict *d, void *key, size_t keylen, int *nokey) {
4496     void *res = raxFind(d->rax,key,keylen);
4497     if (nokey) *nokey = (res == raxNotFound);
4498     return (res == raxNotFound) ? NULL : res;
4499 }
4500 
4501 /* Like RedisModule_DictGetC() but takes the key as a RedisModuleString. */
RM_DictGet(RedisModuleDict * d,RedisModuleString * key,int * nokey)4502 void *RM_DictGet(RedisModuleDict *d, RedisModuleString *key, int *nokey) {
4503     return RM_DictGetC(d,key->ptr,sdslen(key->ptr),nokey);
4504 }
4505 
4506 /* Remove the specified key from the dictionary, returning REDISMODULE_OK if
4507  * the key was found and delted, or REDISMODULE_ERR if instead there was
4508  * no such key in the dictionary. When the operation is successful, if
4509  * 'oldval' is not NULL, then '*oldval' is set to the value stored at the
4510  * key before it was deleted. Using this feature it is possible to get
4511  * a pointer to the value (for instance in order to release it), without
4512  * having to call RedisModule_DictGet() before deleting the key. */
RM_DictDelC(RedisModuleDict * d,void * key,size_t keylen,void * oldval)4513 int RM_DictDelC(RedisModuleDict *d, void *key, size_t keylen, void *oldval) {
4514     int retval = raxRemove(d->rax,key,keylen,oldval);
4515     return retval ? REDISMODULE_OK : REDISMODULE_ERR;
4516 }
4517 
4518 /* Like RedisModule_DictDelC() but gets the key as a RedisModuleString. */
RM_DictDel(RedisModuleDict * d,RedisModuleString * key,void * oldval)4519 int RM_DictDel(RedisModuleDict *d, RedisModuleString *key, void *oldval) {
4520     return RM_DictDelC(d,key->ptr,sdslen(key->ptr),oldval);
4521 }
4522 
4523 /* Return an interator, setup in order to start iterating from the specified
4524  * key by applying the operator 'op', which is just a string specifying the
4525  * comparison operator to use in order to seek the first element. The
4526  * operators avalable are:
4527  *
4528  * "^"   -- Seek the first (lexicographically smaller) key.
4529  * "$"   -- Seek the last  (lexicographically biffer) key.
4530  * ">"   -- Seek the first element greter than the specified key.
4531  * ">="  -- Seek the first element greater or equal than the specified key.
4532  * "<"   -- Seek the first element smaller than the specified key.
4533  * "<="  -- Seek the first element smaller or equal than the specified key.
4534  * "=="  -- Seek the first element matching exactly the specified key.
4535  *
4536  * Note that for "^" and "$" the passed key is not used, and the user may
4537  * just pass NULL with a length of 0.
4538  *
4539  * If the element to start the iteration cannot be seeked based on the
4540  * key and operator passed, RedisModule_DictNext() / Prev() will just return
4541  * REDISMODULE_ERR at the first call, otherwise they'll produce elements.
4542  */
RM_DictIteratorStartC(RedisModuleDict * d,const char * op,void * key,size_t keylen)4543 RedisModuleDictIter *RM_DictIteratorStartC(RedisModuleDict *d, const char *op, void *key, size_t keylen) {
4544     RedisModuleDictIter *di = zmalloc(sizeof(*di));
4545     di->dict = d;
4546     raxStart(&di->ri,d->rax);
4547     raxSeek(&di->ri,op,key,keylen);
4548     return di;
4549 }
4550 
4551 /* Exactly like RedisModule_DictIteratorStartC, but the key is passed as a
4552  * RedisModuleString. */
RM_DictIteratorStart(RedisModuleDict * d,const char * op,RedisModuleString * key)4553 RedisModuleDictIter *RM_DictIteratorStart(RedisModuleDict *d, const char *op, RedisModuleString *key) {
4554     return RM_DictIteratorStartC(d,op,key->ptr,sdslen(key->ptr));
4555 }
4556 
4557 /* Release the iterator created with RedisModule_DictIteratorStart(). This call
4558  * is mandatory otherwise a memory leak is introduced in the module. */
RM_DictIteratorStop(RedisModuleDictIter * di)4559 void RM_DictIteratorStop(RedisModuleDictIter *di) {
4560     raxStop(&di->ri);
4561     zfree(di);
4562 }
4563 
4564 /* After its creation with RedisModule_DictIteratorStart(), it is possible to
4565  * change the currently selected element of the iterator by using this
4566  * API call. The result based on the operator and key is exactly like
4567  * the function RedisModule_DictIteratorStart(), however in this case the
4568  * return value is just REDISMODULE_OK in case the seeked element was found,
4569  * or REDISMODULE_ERR in case it was not possible to seek the specified
4570  * element. It is possible to reseek an iterator as many times as you want. */
RM_DictIteratorReseekC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4571 int RM_DictIteratorReseekC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4572     return raxSeek(&di->ri,op,key,keylen);
4573 }
4574 
4575 /* Like RedisModule_DictIteratorReseekC() but takes the key as as a
4576  * RedisModuleString. */
RM_DictIteratorReseek(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4577 int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4578     return RM_DictIteratorReseekC(di,op,key->ptr,sdslen(key->ptr));
4579 }
4580 
4581 /* Return the current item of the dictionary iterator 'di' and steps to the
4582  * next element. If the iterator already yield the last element and there
4583  * are no other elements to return, NULL is returned, otherwise a pointer
4584  * to a string representing the key is provided, and the '*keylen' length
4585  * is set by reference (if keylen is not NULL). The '*dataptr', if not NULL
4586  * is set to the value of the pointer stored at the returned key as auxiliary
4587  * data (as set by the RedisModule_DictSet API).
4588  *
4589  * Usage example:
4590  *
4591  *      ... create the iterator here ...
4592  *      char *key;
4593  *      void *data;
4594  *      while((key = RedisModule_DictNextC(iter,&keylen,&data)) != NULL) {
4595  *          printf("%.*s %p\n", (int)keylen, key, data);
4596  *      }
4597  *
4598  * The returned pointer is of type void because sometimes it makes sense
4599  * to cast it to a char* sometimes to an unsigned char* depending on the
4600  * fact it contains or not binary data, so this API ends being more
4601  * comfortable to use.
4602  *
4603  * The validity of the returned pointer is until the next call to the
4604  * next/prev iterator step. Also the pointer is no longer valid once the
4605  * iterator is released. */
RM_DictNextC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4606 void *RM_DictNextC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4607     if (!raxNext(&di->ri)) return NULL;
4608     if (keylen) *keylen = di->ri.key_len;
4609     if (dataptr) *dataptr = di->ri.data;
4610     return di->ri.key;
4611 }
4612 
4613 /* This function is exactly like RedisModule_DictNext() but after returning
4614  * the currently selected element in the iterator, it selects the previous
4615  * element (laxicographically smaller) instead of the next one. */
RM_DictPrevC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4616 void *RM_DictPrevC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4617     if (!raxPrev(&di->ri)) return NULL;
4618     if (keylen) *keylen = di->ri.key_len;
4619     if (dataptr) *dataptr = di->ri.data;
4620     return di->ri.key;
4621 }
4622 
4623 /* Like RedisModuleNextC(), but instead of returning an internally allocated
4624  * buffer and key length, it returns directly a module string object allocated
4625  * in the specified context 'ctx' (that may be NULL exactly like for the main
4626  * API RedisModule_CreateString).
4627  *
4628  * The returned string object should be deallocated after use, either manually
4629  * or by using a context that has automatic memory management active. */
RM_DictNext(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4630 RedisModuleString *RM_DictNext(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4631     size_t keylen;
4632     void *key = RM_DictNextC(di,&keylen,dataptr);
4633     if (key == NULL) return NULL;
4634     return RM_CreateString(ctx,key,keylen);
4635 }
4636 
4637 /* Like RedisModule_DictNext() but after returning the currently selected
4638  * element in the iterator, it selects the previous element (laxicographically
4639  * smaller) instead of the next one. */
RM_DictPrev(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4640 RedisModuleString *RM_DictPrev(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4641     size_t keylen;
4642     void *key = RM_DictPrevC(di,&keylen,dataptr);
4643     if (key == NULL) return NULL;
4644     return RM_CreateString(ctx,key,keylen);
4645 }
4646 
4647 /* Compare the element currently pointed by the iterator to the specified
4648  * element given by key/keylen, according to the operator 'op' (the set of
4649  * valid operators are the same valid for RedisModule_DictIteratorStart).
4650  * If the comparision is successful the command returns REDISMODULE_OK
4651  * otherwise REDISMODULE_ERR is returned.
4652  *
4653  * This is useful when we want to just emit a lexicographical range, so
4654  * in the loop, as we iterate elements, we can also check if we are still
4655  * on range.
4656  *
4657  * The function returne REDISMODULE_ERR if the iterator reached the
4658  * end of elements condition as well. */
RM_DictCompareC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4659 int RM_DictCompareC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4660     if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4661     int res = raxCompare(&di->ri,op,key,keylen);
4662     return res ? REDISMODULE_OK : REDISMODULE_ERR;
4663 }
4664 
4665 /* Like RedisModule_DictCompareC but gets the key to compare with the current
4666  * iterator key as a RedisModuleString. */
RM_DictCompare(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4667 int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4668     if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4669     int res = raxCompare(&di->ri,op,key->ptr,sdslen(key->ptr));
4670     return res ? REDISMODULE_OK : REDISMODULE_ERR;
4671 }
4672 
4673 /* --------------------------------------------------------------------------
4674  * Modules utility APIs
4675  * -------------------------------------------------------------------------- */
4676 
4677 /* Return random bytes using SHA1 in counter mode with a /dev/urandom
4678  * initialized seed. This function is fast so can be used to generate
4679  * many bytes without any effect on the operating system entropy pool.
4680  * Currently this function is not thread safe. */
RM_GetRandomBytes(unsigned char * dst,size_t len)4681 void RM_GetRandomBytes(unsigned char *dst, size_t len) {
4682     getRandomBytes(dst,len);
4683 }
4684 
4685 /* Like RedisModule_GetRandomBytes() but instead of setting the string to
4686  * random bytes the string is set to random characters in the in the
4687  * hex charset [0-9a-f]. */
RM_GetRandomHexChars(char * dst,size_t len)4688 void RM_GetRandomHexChars(char *dst, size_t len) {
4689     getRandomHexChars(dst,len);
4690 }
4691 
4692 /* --------------------------------------------------------------------------
4693  * Modules API exporting / importing
4694  * -------------------------------------------------------------------------- */
4695 
4696 /* This function is called by a module in order to export some API with a
4697  * given name. Other modules will be able to use this API by calling the
4698  * symmetrical function RM_GetSharedAPI() and casting the return value to
4699  * the right function pointer.
4700  *
4701  * The function will return REDISMODULE_OK if the name is not already taken,
4702  * otherwise REDISMODULE_ERR will be returned and no operation will be
4703  * performed.
4704  *
4705  * IMPORTANT: the apiname argument should be a string literal with static
4706  * lifetime. The API relies on the fact that it will always be valid in
4707  * the future. */
RM_ExportSharedAPI(RedisModuleCtx * ctx,const char * apiname,void * func)4708 int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) {
4709     RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi));
4710     sapi->module = ctx->module;
4711     sapi->func = func;
4712     if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) {
4713         zfree(sapi);
4714         return REDISMODULE_ERR;
4715     }
4716     return REDISMODULE_OK;
4717 }
4718 
4719 /* Request an exported API pointer. The return value is just a void pointer
4720  * that the caller of this function will be required to cast to the right
4721  * function pointer, so this is a private contract between modules.
4722  *
4723  * If the requested API is not available then NULL is returned. Because
4724  * modules can be loaded at different times with different order, this
4725  * function calls should be put inside some module generic API registering
4726  * step, that is called every time a module attempts to execute a
4727  * command that requires external APIs: if some API cannot be resolved, the
4728  * command should return an error.
4729  *
4730  * Here is an exmaple:
4731  *
4732  *     int ... myCommandImplementation() {
4733  *        if (getExternalAPIs() == 0) {
4734  *             reply with an error here if we cannot have the APIs
4735  *        }
4736  *        // Use the API:
4737  *        myFunctionPointer(foo);
4738  *     }
4739  *
4740  * And the function registerAPI() is:
4741  *
4742  *     int getExternalAPIs(void) {
4743  *         static int api_loaded = 0;
4744  *         if (api_loaded != 0) return 1; // APIs already resolved.
4745  *
4746  *         myFunctionPointer = RedisModule_GetOtherModuleAPI("...");
4747  *         if (myFunctionPointer == NULL) return 0;
4748  *
4749  *         return 1;
4750  *     }
4751  */
RM_GetSharedAPI(RedisModuleCtx * ctx,const char * apiname)4752 void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) {
4753     dictEntry *de = dictFind(server.sharedapi, apiname);
4754     if (de == NULL) return NULL;
4755     RedisModuleSharedAPI *sapi = dictGetVal(de);
4756     if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) {
4757         listAddNodeTail(sapi->module->usedby,ctx->module);
4758         listAddNodeTail(ctx->module->using,sapi->module);
4759     }
4760     return sapi->func;
4761 }
4762 
4763 /* Remove all the APIs registered by the specified module. Usually you
4764  * want this when the module is going to be unloaded. This function
4765  * assumes that's caller responsibility to make sure the APIs are not
4766  * used by other modules.
4767  *
4768  * The number of unregistered APIs is returned. */
moduleUnregisterSharedAPI(RedisModule * module)4769 int moduleUnregisterSharedAPI(RedisModule *module) {
4770     int count = 0;
4771     dictIterator *di = dictGetSafeIterator(server.sharedapi);
4772     dictEntry *de;
4773     while ((de = dictNext(di)) != NULL) {
4774         const char *apiname = dictGetKey(de);
4775         RedisModuleSharedAPI *sapi = dictGetVal(de);
4776         if (sapi->module == module) {
4777             dictDelete(server.sharedapi,apiname);
4778             zfree(sapi);
4779             count++;
4780         }
4781     }
4782     dictReleaseIterator(di);
4783     return count;
4784 }
4785 
4786 /* Remove the specified module as an user of APIs of ever other module.
4787  * This is usually called when a module is unloaded.
4788  *
4789  * Returns the number of modules this module was using APIs from. */
moduleUnregisterUsedAPI(RedisModule * module)4790 int moduleUnregisterUsedAPI(RedisModule *module) {
4791     listIter li;
4792     listNode *ln;
4793     int count = 0;
4794 
4795     listRewind(module->using,&li);
4796     while((ln = listNext(&li))) {
4797         RedisModule *used = ln->value;
4798         listNode *ln = listSearchKey(used->usedby,module);
4799         if (ln) {
4800             listDelNode(module->using,ln);
4801             count++;
4802         }
4803     }
4804     return count;
4805 }
4806 
4807 /* Unregister all filters registered by a module.
4808  * This is called when a module is being unloaded.
4809  *
4810  * Returns the number of filters unregistered. */
moduleUnregisterFilters(RedisModule * module)4811 int moduleUnregisterFilters(RedisModule *module) {
4812     listIter li;
4813     listNode *ln;
4814     int count = 0;
4815 
4816     listRewind(module->filters,&li);
4817     while((ln = listNext(&li))) {
4818         RedisModuleCommandFilter *filter = ln->value;
4819         listNode *ln = listSearchKey(moduleCommandFilters,filter);
4820         if (ln) {
4821             listDelNode(moduleCommandFilters,ln);
4822             count++;
4823         }
4824         zfree(filter);
4825     }
4826     return count;
4827 }
4828 
4829 /* --------------------------------------------------------------------------
4830  * Module Command Filter API
4831  * -------------------------------------------------------------------------- */
4832 
4833 /* Register a new command filter function.
4834  *
4835  * Command filtering makes it possible for modules to extend Redis by plugging
4836  * into the execution flow of all commands.
4837  *
4838  * A registered filter gets called before Redis executes *any* command.  This
4839  * includes both core Redis commands and commands registered by any module.  The
4840  * filter applies in all execution paths including:
4841  *
4842  * 1. Invocation by a client.
4843  * 2. Invocation through `RedisModule_Call()` by any module.
4844  * 3. Invocation through Lua 'redis.call()`.
4845  * 4. Replication of a command from a master.
4846  *
4847  * The filter executes in a special filter context, which is different and more
4848  * limited than a RedisModuleCtx.  Because the filter affects any command, it
4849  * must be implemented in a very efficient way to reduce the performance impact
4850  * on Redis.  All Redis Module API calls that require a valid context (such as
4851  * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a
4852  * filter context.
4853  *
4854  * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the
4855  * executed command and its arguments.  As the filter executes before Redis
4856  * begins processing the command, any change will affect the way the command is
4857  * processed.  For example, a module can override Redis commands this way:
4858  *
4859  * 1. Register a `MODULE.SET` command which implements an extended version of
4860  *    the Redis `SET` command.
4861  * 2. Register a command filter which detects invocation of `SET` on a specific
4862  *    pattern of keys.  Once detected, the filter will replace the first
4863  *    argument from `SET` to `MODULE.SET`.
4864  * 3. When filter execution is complete, Redis considers the new command name
4865  *    and therefore executes the module's own command.
4866  *
4867  * Note that in the above use case, if `MODULE.SET` itself uses
4868  * `RedisModule_Call()` the filter will be applied on that call as well.  If
4869  * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when
4870  * registering the filter.
4871  *
4872  * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that
4873  * originate from the module's own `RM_Call()` from reaching the filter.  This
4874  * flag is effective for all execution flows, including nested ones, as long as
4875  * the execution begins from the module's command context or a thread-safe
4876  * context that is associated with a blocking command.
4877  *
4878  * Detached thread-safe contexts are *not* associated with the module and cannot
4879  * be protected by this flag.
4880  *
4881  * If multiple filters are registered (by the same or different modules), they
4882  * are executed in the order of registration.
4883  */
4884 
RM_RegisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilterFunc callback,int flags)4885 RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) {
4886     RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter));
4887     filter->module = ctx->module;
4888     filter->callback = callback;
4889     filter->flags = flags;
4890 
4891     listAddNodeTail(moduleCommandFilters, filter);
4892     listAddNodeTail(ctx->module->filters, filter);
4893     return filter;
4894 }
4895 
4896 /* Unregister a command filter.
4897  */
RM_UnregisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilter * filter)4898 int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) {
4899     listNode *ln;
4900 
4901     /* A module can only remove its own filters */
4902     if (filter->module != ctx->module) return REDISMODULE_ERR;
4903 
4904     ln = listSearchKey(moduleCommandFilters,filter);
4905     if (!ln) return REDISMODULE_ERR;
4906     listDelNode(moduleCommandFilters,ln);
4907 
4908     ln = listSearchKey(ctx->module->filters,filter);
4909     if (!ln) return REDISMODULE_ERR;    /* Shouldn't happen */
4910     listDelNode(ctx->module->filters,ln);
4911 
4912     return REDISMODULE_OK;
4913 }
4914 
moduleCallCommandFilters(client * c)4915 void moduleCallCommandFilters(client *c) {
4916     if (listLength(moduleCommandFilters) == 0) return;
4917 
4918     listIter li;
4919     listNode *ln;
4920     listRewind(moduleCommandFilters,&li);
4921 
4922     RedisModuleCommandFilterCtx filter = {
4923         .argv = c->argv,
4924         .argc = c->argc
4925     };
4926 
4927     while((ln = listNext(&li))) {
4928         RedisModuleCommandFilter *f = ln->value;
4929 
4930         /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is
4931          * currently processing a command.
4932          */
4933         if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue;
4934 
4935         /* Call filter */
4936         f->callback(&filter);
4937     }
4938 
4939     c->argv = filter.argv;
4940     c->argc = filter.argc;
4941 }
4942 
4943 /* Return the number of arguments a filtered command has.  The number of
4944  * arguments include the command itself.
4945  */
RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx * fctx)4946 int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx)
4947 {
4948     return fctx->argc;
4949 }
4950 
4951 /* Return the specified command argument.  The first argument (position 0) is
4952  * the command itself, and the rest are user-provided args.
4953  */
RM_CommandFilterArgGet(RedisModuleCommandFilterCtx * fctx,int pos)4954 const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos)
4955 {
4956     if (pos < 0 || pos >= fctx->argc) return NULL;
4957     return fctx->argv[pos];
4958 }
4959 
4960 /* Modify the filtered command by inserting a new argument at the specified
4961  * position.  The specified RedisModuleString argument may be used by Redis
4962  * after the filter context is destroyed, so it must not be auto-memory
4963  * allocated, freed or used elsewhere.
4964  */
4965 
RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)4966 int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
4967 {
4968     int i;
4969 
4970     if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR;
4971 
4972     fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *));
4973     for (i = fctx->argc; i > pos; i--) {
4974         fctx->argv[i] = fctx->argv[i-1];
4975     }
4976     fctx->argv[pos] = arg;
4977     fctx->argc++;
4978 
4979     return REDISMODULE_OK;
4980 }
4981 
4982 /* Modify the filtered command by replacing an existing argument with a new one.
4983  * The specified RedisModuleString argument may be used by Redis after the
4984  * filter context is destroyed, so it must not be auto-memory allocated, freed
4985  * or used elsewhere.
4986  */
4987 
RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)4988 int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
4989 {
4990     if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
4991 
4992     decrRefCount(fctx->argv[pos]);
4993     fctx->argv[pos] = arg;
4994 
4995     return REDISMODULE_OK;
4996 }
4997 
4998 /* Modify the filtered command by deleting an argument at the specified
4999  * position.
5000  */
RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx * fctx,int pos)5001 int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
5002 {
5003     int i;
5004     if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
5005 
5006     decrRefCount(fctx->argv[pos]);
5007     for (i = pos; i < fctx->argc-1; i++) {
5008         fctx->argv[i] = fctx->argv[i+1];
5009     }
5010     fctx->argc--;
5011 
5012     return REDISMODULE_OK;
5013 }
5014 
5015 /* --------------------------------------------------------------------------
5016  * Modules API internals
5017  * -------------------------------------------------------------------------- */
5018 
5019 /* server.moduleapi dictionary type. Only uses plain C strings since
5020  * this gets queries from modules. */
5021 
dictCStringKeyHash(const void * key)5022 uint64_t dictCStringKeyHash(const void *key) {
5023     return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
5024 }
5025 
dictCStringKeyCompare(void * privdata,const void * key1,const void * key2)5026 int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) {
5027     UNUSED(privdata);
5028     return strcmp(key1,key2) == 0;
5029 }
5030 
5031 dictType moduleAPIDictType = {
5032     dictCStringKeyHash,        /* hash function */
5033     NULL,                      /* key dup */
5034     NULL,                      /* val dup */
5035     dictCStringKeyCompare,     /* key compare */
5036     NULL,                      /* key destructor */
5037     NULL                       /* val destructor */
5038 };
5039 
moduleRegisterApi(const char * funcname,void * funcptr)5040 int moduleRegisterApi(const char *funcname, void *funcptr) {
5041     return dictAdd(server.moduleapi, (char*)funcname, funcptr);
5042 }
5043 
5044 #define REGISTER_API(name) \
5045     moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)
5046 
5047 /* Global initialization at Redis startup. */
5048 void moduleRegisterCoreAPI(void);
5049 
moduleInitModulesSystem(void)5050 void moduleInitModulesSystem(void) {
5051     moduleUnblockedClients = listCreate();
5052     server.loadmodule_queue = listCreate();
5053     modules = dictCreate(&modulesDictType,NULL);
5054 
5055     /* Set up the keyspace notification susbscriber list and static client */
5056     moduleKeyspaceSubscribers = listCreate();
5057     moduleFreeContextReusedClient = createClient(-1);
5058     moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
5059 
5060     /* Set up filter list */
5061     moduleCommandFilters = listCreate();
5062 
5063     moduleRegisterCoreAPI();
5064     if (pipe(server.module_blocked_pipe) == -1) {
5065         serverLog(LL_WARNING,
5066             "Can't create the pipe for module blocking commands: %s",
5067             strerror(errno));
5068         exit(1);
5069     }
5070     /* Make the pipe non blocking. This is just a best effort aware mechanism
5071      * and we do not want to block not in the read nor in the write half. */
5072     anetNonBlock(NULL,server.module_blocked_pipe[0]);
5073     anetNonBlock(NULL,server.module_blocked_pipe[1]);
5074 
5075     /* Create the timers radix tree. */
5076     Timers = raxNew();
5077 
5078     /* Our thread-safe contexts GIL must start with already locked:
5079      * it is just unlocked when it's safe. */
5080     pthread_mutex_lock(&moduleGIL);
5081 }
5082 
5083 /* Load all the modules in the server.loadmodule_queue list, which is
5084  * populated by `loadmodule` directives in the configuration file.
5085  * We can't load modules directly when processing the configuration file
5086  * because the server must be fully initialized before loading modules.
5087  *
5088  * The function aborts the server on errors, since to start with missing
5089  * modules is not considered sane: clients may rely on the existence of
5090  * given commands, loading AOF also may need some modules to exist, and
5091  * if this instance is a slave, it must understand commands from master. */
moduleLoadFromQueue(void)5092 void moduleLoadFromQueue(void) {
5093     listIter li;
5094     listNode *ln;
5095 
5096     listRewind(server.loadmodule_queue,&li);
5097     while((ln = listNext(&li))) {
5098         struct moduleLoadQueueEntry *loadmod = ln->value;
5099         if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
5100             == C_ERR)
5101         {
5102             serverLog(LL_WARNING,
5103                 "Can't load module from %s: server aborting",
5104                 loadmod->path);
5105             exit(1);
5106         }
5107     }
5108 }
5109 
moduleFreeModuleStructure(struct RedisModule * module)5110 void moduleFreeModuleStructure(struct RedisModule *module) {
5111     listRelease(module->types);
5112     listRelease(module->filters);
5113     sdsfree(module->name);
5114     zfree(module);
5115 }
5116 
moduleUnregisterCommands(struct RedisModule * module)5117 void moduleUnregisterCommands(struct RedisModule *module) {
5118     /* Unregister all the commands registered by this module. */
5119     dictIterator *di = dictGetSafeIterator(server.commands);
5120     dictEntry *de;
5121     while ((de = dictNext(di)) != NULL) {
5122         struct redisCommand *cmd = dictGetVal(de);
5123         if (cmd->proc == RedisModuleCommandDispatcher) {
5124             RedisModuleCommandProxy *cp =
5125                 (void*)(unsigned long)cmd->getkeys_proc;
5126             sds cmdname = cp->rediscmd->name;
5127             if (cp->module == module) {
5128                 dictDelete(server.commands,cmdname);
5129                 dictDelete(server.orig_commands,cmdname);
5130                 sdsfree(cmdname);
5131                 zfree(cp->rediscmd);
5132                 zfree(cp);
5133             }
5134         }
5135     }
5136     dictReleaseIterator(di);
5137 }
5138 
5139 /* Load a module and initialize it. On success C_OK is returned, otherwise
5140  * C_ERR is returned. */
moduleLoad(const char * path,void ** module_argv,int module_argc)5141 int moduleLoad(const char *path, void **module_argv, int module_argc) {
5142     int (*onload)(void *, void **, int);
5143     void *handle;
5144     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
5145 
5146     handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
5147     if (handle == NULL) {
5148         serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
5149         return C_ERR;
5150     }
5151     onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
5152     if (onload == NULL) {
5153         dlclose(handle);
5154         serverLog(LL_WARNING,
5155             "Module %s does not export RedisModule_OnLoad() "
5156             "symbol. Module not loaded.",path);
5157         return C_ERR;
5158     }
5159     if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
5160         if (ctx.module) {
5161             moduleUnregisterCommands(ctx.module);
5162             moduleUnregisterSharedAPI(ctx.module);
5163             moduleUnregisterUsedAPI(ctx.module);
5164             moduleFreeModuleStructure(ctx.module);
5165         }
5166         dlclose(handle);
5167         serverLog(LL_WARNING,
5168             "Module %s initialization failed. Module not loaded",path);
5169         return C_ERR;
5170     }
5171 
5172     /* Redis module loaded! Register it. */
5173     dictAdd(modules,ctx.module->name,ctx.module);
5174     ctx.module->handle = handle;
5175     serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
5176     moduleFreeContext(&ctx);
5177     return C_OK;
5178 }
5179 
5180 
5181 /* Unload the module registered with the specified name. On success
5182  * C_OK is returned, otherwise C_ERR is returned and errno is set
5183  * to the following values depending on the type of error:
5184  *
5185  * * ENONET: No such module having the specified name.
5186  * * EBUSY: The module exports a new data type and can only be reloaded. */
moduleUnload(sds name)5187 int moduleUnload(sds name) {
5188     struct RedisModule *module = dictFetchValue(modules,name);
5189 
5190     if (module == NULL) {
5191         errno = ENOENT;
5192         return REDISMODULE_ERR;
5193     } else if (listLength(module->types)) {
5194         errno = EBUSY;
5195         return REDISMODULE_ERR;
5196     } else if (listLength(module->usedby)) {
5197         errno = EPERM;
5198         return REDISMODULE_ERR;
5199     }
5200 
5201     moduleUnregisterCommands(module);
5202     moduleUnregisterSharedAPI(module);
5203     moduleUnregisterUsedAPI(module);
5204     moduleUnregisterFilters(module);
5205 
5206     /* Remove any notification subscribers this module might have */
5207     moduleUnsubscribeNotifications(module);
5208 
5209     /* Unregister all the hooks. TODO: Yet no hooks support here. */
5210 
5211     /* Unload the dynamic library. */
5212     if (dlclose(module->handle) == -1) {
5213         char *error = dlerror();
5214         if (error == NULL) error = "Unknown error";
5215         serverLog(LL_WARNING,"Error when trying to close the %s module: %s",
5216             module->name, error);
5217     }
5218 
5219     /* Remove from list of modules. */
5220     serverLog(LL_NOTICE,"Module %s unloaded",module->name);
5221     dictDelete(modules,module->name);
5222     module->name = NULL; /* The name was already freed by dictDelete(). */
5223     moduleFreeModuleStructure(module);
5224 
5225     return REDISMODULE_OK;
5226 }
5227 
5228 /* Redis MODULE command.
5229  *
5230  * MODULE LOAD <path> [args...] */
moduleCommand(client * c)5231 void moduleCommand(client *c) {
5232     char *subcmd = c->argv[1]->ptr;
5233     if (c->argc == 2 && !strcasecmp(subcmd,"help")) {
5234         const char *help[] = {
5235 "LIST -- Return a list of loaded modules.",
5236 "LOAD <path> [arg ...] -- Load a module library from <path>.",
5237 "UNLOAD <name> -- Unload a module.",
5238 NULL
5239         };
5240         addReplyHelp(c, help);
5241     } else
5242     if (!strcasecmp(subcmd,"load") && c->argc >= 3) {
5243         robj **argv = NULL;
5244         int argc = 0;
5245 
5246         if (c->argc > 3) {
5247             argc = c->argc - 3;
5248             argv = &c->argv[3];
5249         }
5250 
5251         if (moduleLoad(c->argv[2]->ptr,(void **)argv,argc) == C_OK)
5252             addReply(c,shared.ok);
5253         else
5254             addReplyError(c,
5255                 "Error loading the extension. Please check the server logs.");
5256     } else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
5257         if (moduleUnload(c->argv[2]->ptr) == C_OK)
5258             addReply(c,shared.ok);
5259         else {
5260             char *errmsg;
5261             switch(errno) {
5262             case ENOENT:
5263                 errmsg = "no such module with that name";
5264                 break;
5265             case EBUSY:
5266                 errmsg = "the module exports one or more module-side data "
5267                          "types, can't unload";
5268                 break;
5269             case EPERM:
5270                 errmsg = "the module exports APIs used by other modules. "
5271                          "Please unload them first and try again";
5272                 break;
5273             default:
5274                 errmsg = "operation not possible.";
5275                 break;
5276             }
5277             addReplyErrorFormat(c,"Error unloading module: %s",errmsg);
5278         }
5279     } else if (!strcasecmp(subcmd,"list") && c->argc == 2) {
5280         dictIterator *di = dictGetIterator(modules);
5281         dictEntry *de;
5282 
5283         addReplyMultiBulkLen(c,dictSize(modules));
5284         while ((de = dictNext(di)) != NULL) {
5285             sds name = dictGetKey(de);
5286             struct RedisModule *module = dictGetVal(de);
5287             addReplyMultiBulkLen(c,4);
5288             addReplyBulkCString(c,"name");
5289             addReplyBulkCBuffer(c,name,sdslen(name));
5290             addReplyBulkCString(c,"ver");
5291             addReplyLongLong(c,module->ver);
5292         }
5293         dictReleaseIterator(di);
5294     } else {
5295         addReplySubcommandSyntaxError(c);
5296         return;
5297     }
5298 }
5299 
5300 /* Return the number of registered modules. */
moduleCount(void)5301 size_t moduleCount(void) {
5302     return dictSize(modules);
5303 }
5304 
5305 /* Register all the APIs we export. Keep this function at the end of the
5306  * file so that's easy to seek it to add new entries. */
moduleRegisterCoreAPI(void)5307 void moduleRegisterCoreAPI(void) {
5308     server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
5309     server.sharedapi = dictCreate(&moduleAPIDictType,NULL);
5310     REGISTER_API(Alloc);
5311     REGISTER_API(Calloc);
5312     REGISTER_API(Realloc);
5313     REGISTER_API(Free);
5314     REGISTER_API(Strdup);
5315     REGISTER_API(CreateCommand);
5316     REGISTER_API(SetModuleAttribs);
5317     REGISTER_API(IsModuleNameBusy);
5318     REGISTER_API(WrongArity);
5319     REGISTER_API(ReplyWithLongLong);
5320     REGISTER_API(ReplyWithError);
5321     REGISTER_API(ReplyWithSimpleString);
5322     REGISTER_API(ReplyWithArray);
5323     REGISTER_API(ReplySetArrayLength);
5324     REGISTER_API(ReplyWithString);
5325     REGISTER_API(ReplyWithStringBuffer);
5326     REGISTER_API(ReplyWithNull);
5327     REGISTER_API(ReplyWithCallReply);
5328     REGISTER_API(ReplyWithDouble);
5329     REGISTER_API(GetSelectedDb);
5330     REGISTER_API(SelectDb);
5331     REGISTER_API(OpenKey);
5332     REGISTER_API(CloseKey);
5333     REGISTER_API(KeyType);
5334     REGISTER_API(ValueLength);
5335     REGISTER_API(ListPush);
5336     REGISTER_API(ListPop);
5337     REGISTER_API(StringToLongLong);
5338     REGISTER_API(StringToDouble);
5339     REGISTER_API(Call);
5340     REGISTER_API(CallReplyProto);
5341     REGISTER_API(FreeCallReply);
5342     REGISTER_API(CallReplyInteger);
5343     REGISTER_API(CallReplyType);
5344     REGISTER_API(CallReplyLength);
5345     REGISTER_API(CallReplyArrayElement);
5346     REGISTER_API(CallReplyStringPtr);
5347     REGISTER_API(CreateStringFromCallReply);
5348     REGISTER_API(CreateString);
5349     REGISTER_API(CreateStringFromLongLong);
5350     REGISTER_API(CreateStringFromString);
5351     REGISTER_API(CreateStringPrintf);
5352     REGISTER_API(FreeString);
5353     REGISTER_API(StringPtrLen);
5354     REGISTER_API(AutoMemory);
5355     REGISTER_API(Replicate);
5356     REGISTER_API(ReplicateVerbatim);
5357     REGISTER_API(DeleteKey);
5358     REGISTER_API(UnlinkKey);
5359     REGISTER_API(StringSet);
5360     REGISTER_API(StringDMA);
5361     REGISTER_API(StringTruncate);
5362     REGISTER_API(SetExpire);
5363     REGISTER_API(GetExpire);
5364     REGISTER_API(ZsetAdd);
5365     REGISTER_API(ZsetIncrby);
5366     REGISTER_API(ZsetScore);
5367     REGISTER_API(ZsetRem);
5368     REGISTER_API(ZsetRangeStop);
5369     REGISTER_API(ZsetFirstInScoreRange);
5370     REGISTER_API(ZsetLastInScoreRange);
5371     REGISTER_API(ZsetFirstInLexRange);
5372     REGISTER_API(ZsetLastInLexRange);
5373     REGISTER_API(ZsetRangeCurrentElement);
5374     REGISTER_API(ZsetRangeNext);
5375     REGISTER_API(ZsetRangePrev);
5376     REGISTER_API(ZsetRangeEndReached);
5377     REGISTER_API(HashSet);
5378     REGISTER_API(HashGet);
5379     REGISTER_API(IsKeysPositionRequest);
5380     REGISTER_API(KeyAtPos);
5381     REGISTER_API(GetClientId);
5382     REGISTER_API(GetContextFlags);
5383     REGISTER_API(PoolAlloc);
5384     REGISTER_API(CreateDataType);
5385     REGISTER_API(ModuleTypeSetValue);
5386     REGISTER_API(ModuleTypeGetType);
5387     REGISTER_API(ModuleTypeGetValue);
5388     REGISTER_API(SaveUnsigned);
5389     REGISTER_API(LoadUnsigned);
5390     REGISTER_API(SaveSigned);
5391     REGISTER_API(LoadSigned);
5392     REGISTER_API(SaveString);
5393     REGISTER_API(SaveStringBuffer);
5394     REGISTER_API(LoadString);
5395     REGISTER_API(LoadStringBuffer);
5396     REGISTER_API(SaveDouble);
5397     REGISTER_API(LoadDouble);
5398     REGISTER_API(SaveFloat);
5399     REGISTER_API(LoadFloat);
5400     REGISTER_API(EmitAOF);
5401     REGISTER_API(Log);
5402     REGISTER_API(LogIOError);
5403     REGISTER_API(StringAppendBuffer);
5404     REGISTER_API(RetainString);
5405     REGISTER_API(StringCompare);
5406     REGISTER_API(GetContextFromIO);
5407     REGISTER_API(GetKeyNameFromIO);
5408     REGISTER_API(BlockClient);
5409     REGISTER_API(UnblockClient);
5410     REGISTER_API(IsBlockedReplyRequest);
5411     REGISTER_API(IsBlockedTimeoutRequest);
5412     REGISTER_API(GetBlockedClientPrivateData);
5413     REGISTER_API(AbortBlock);
5414     REGISTER_API(Milliseconds);
5415     REGISTER_API(GetThreadSafeContext);
5416     REGISTER_API(FreeThreadSafeContext);
5417     REGISTER_API(ThreadSafeContextLock);
5418     REGISTER_API(ThreadSafeContextUnlock);
5419     REGISTER_API(DigestAddStringBuffer);
5420     REGISTER_API(DigestAddLongLong);
5421     REGISTER_API(DigestEndSequence);
5422     REGISTER_API(SubscribeToKeyspaceEvents);
5423     REGISTER_API(RegisterClusterMessageReceiver);
5424     REGISTER_API(SendClusterMessage);
5425     REGISTER_API(GetClusterNodeInfo);
5426     REGISTER_API(GetClusterNodesList);
5427     REGISTER_API(FreeClusterNodesList);
5428     REGISTER_API(CreateTimer);
5429     REGISTER_API(StopTimer);
5430     REGISTER_API(GetTimerInfo);
5431     REGISTER_API(GetMyClusterID);
5432     REGISTER_API(GetClusterSize);
5433     REGISTER_API(GetRandomBytes);
5434     REGISTER_API(GetRandomHexChars);
5435     REGISTER_API(BlockedClientDisconnected);
5436     REGISTER_API(SetDisconnectCallback);
5437     REGISTER_API(GetBlockedClientHandle);
5438     REGISTER_API(SetClusterFlags);
5439     REGISTER_API(CreateDict);
5440     REGISTER_API(FreeDict);
5441     REGISTER_API(DictSize);
5442     REGISTER_API(DictSetC);
5443     REGISTER_API(DictReplaceC);
5444     REGISTER_API(DictSet);
5445     REGISTER_API(DictReplace);
5446     REGISTER_API(DictGetC);
5447     REGISTER_API(DictGet);
5448     REGISTER_API(DictDelC);
5449     REGISTER_API(DictDel);
5450     REGISTER_API(DictIteratorStartC);
5451     REGISTER_API(DictIteratorStart);
5452     REGISTER_API(DictIteratorStop);
5453     REGISTER_API(DictIteratorReseekC);
5454     REGISTER_API(DictIteratorReseek);
5455     REGISTER_API(DictNextC);
5456     REGISTER_API(DictPrevC);
5457     REGISTER_API(DictNext);
5458     REGISTER_API(DictPrev);
5459     REGISTER_API(DictCompareC);
5460     REGISTER_API(DictCompare);
5461     REGISTER_API(ExportSharedAPI);
5462     REGISTER_API(GetSharedAPI);
5463     REGISTER_API(RegisterCommandFilter);
5464     REGISTER_API(UnregisterCommandFilter);
5465     REGISTER_API(CommandFilterArgsCount);
5466     REGISTER_API(CommandFilterArgGet);
5467     REGISTER_API(CommandFilterArgInsert);
5468     REGISTER_API(CommandFilterArgReplace);
5469     REGISTER_API(CommandFilterArgDelete);
5470 }
5471