1 /*
2 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include <dlfcn.h>
33
34 #define REDISMODULE_CORE 1
35 #include "redismodule.h"
36
37 /* --------------------------------------------------------------------------
38 * Private data structures used by the modules system. Those are data
39 * structures that are never exposed to Redis Modules, if not as void
40 * pointers that have an API the module can call with them)
41 * -------------------------------------------------------------------------- */
42
43 /* This structure represents a module inside the system. */
44 struct RedisModule {
45 void *handle; /* Module dlopen() handle. */
46 char *name; /* Module name. */
47 int ver; /* Module version. We use just progressive integers. */
48 int apiver; /* Module API version as requested during initialization.*/
49 list *types; /* Module data types. */
50 list *usedby; /* List of modules using APIs from this one. */
51 list *using; /* List of modules we use some APIs of. */
52 list *filters; /* List of filters the module has registered. */
53 int in_call; /* RM_Call() nesting level */
54 };
55 typedef struct RedisModule RedisModule;
56
57 /* This represents a shared API. Shared APIs will be used to populate
58 * the server.sharedapi dictionary, mapping names of APIs exported by
59 * modules for other modules to use, to their structure specifying the
60 * function pointer that can be called. */
61 struct RedisModuleSharedAPI {
62 void *func;
63 RedisModule *module;
64 };
65 typedef struct RedisModuleSharedAPI RedisModuleSharedAPI;
66
67 static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
68
69 /* Entries in the context->amqueue array, representing objects to free
70 * when the callback returns. */
71 struct AutoMemEntry {
72 void *ptr;
73 int type;
74 };
75
76 /* AutMemEntry type field values. */
77 #define REDISMODULE_AM_KEY 0
78 #define REDISMODULE_AM_STRING 1
79 #define REDISMODULE_AM_REPLY 2
80 #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
81 #define REDISMODULE_AM_DICT 4
82
83 /* The pool allocator block. Redis Modules can allocate memory via this special
84 * allocator that will automatically release it all once the callback returns.
85 * This means that it can only be used for ephemeral allocations. However
86 * there are two advantages for modules to use this API:
87 *
88 * 1) The memory is automatically released when the callback returns.
89 * 2) This allocator is faster for many small allocations since whole blocks
90 * are allocated, and small pieces returned to the caller just advancing
91 * the index of the allocation.
92 *
93 * Allocations are always rounded to the size of the void pointer in order
94 * to always return aligned memory chunks. */
95
96 #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8)
97 #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*))
98
99 typedef struct RedisModulePoolAllocBlock {
100 uint32_t size;
101 uint32_t used;
102 struct RedisModulePoolAllocBlock *next;
103 char memory[];
104 } RedisModulePoolAllocBlock;
105
106 /* This structure represents the context in which Redis modules operate.
107 * Most APIs module can access, get a pointer to the context, so that the API
108 * implementation can hold state across calls, or remember what to free after
109 * the call and so forth.
110 *
111 * Note that not all the context structure is always filled with actual values
112 * but only the fields needed in a given context. */
113
114 struct RedisModuleBlockedClient;
115
116 struct RedisModuleCtx {
117 void *getapifuncptr; /* NOTE: Must be the first field. */
118 struct RedisModule *module; /* Module reference. */
119 client *client; /* Client calling a command. */
120 struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
121 thread safe context. */
122 struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
123 int amqueue_len; /* Number of slots in amqueue. */
124 int amqueue_used; /* Number of used slots in amqueue. */
125 int flags; /* REDISMODULE_CTX_... flags. */
126 void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
127 int postponed_arrays_count; /* Number of entries in postponed_arrays. */
128 void *blocked_privdata; /* Privdata set when unblocking a client. */
129
130 /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
131 int *keys_pos;
132 int keys_count;
133
134 struct RedisModulePoolAllocBlock *pa_head;
135 };
136 typedef struct RedisModuleCtx RedisModuleCtx;
137
138 #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
139 #define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
140 #define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
141 #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
142 #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
143 #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
144 #define REDISMODULE_CTX_THREAD_SAFE (1<<5)
145 #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
146
147 /* This represents a Redis key opened with RM_OpenKey(). */
148 struct RedisModuleKey {
149 RedisModuleCtx *ctx;
150 redisDb *db;
151 robj *key; /* Key name object. */
152 robj *value; /* Value object, or NULL if the key was not found. */
153 void *iter; /* Iterator. */
154 int mode; /* Opening mode. */
155
156 /* Zset iterator. */
157 uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */
158 zrangespec zrs; /* Score range. */
159 zlexrangespec zlrs; /* Lex range. */
160 uint32_t zstart; /* Start pos for positional ranges. */
161 uint32_t zend; /* End pos for positional ranges. */
162 void *zcurrent; /* Zset iterator current node. */
163 int zer; /* Zset iterator end reached flag
164 (true if end was reached). */
165 };
166 typedef struct RedisModuleKey RedisModuleKey;
167
168 /* RedisModuleKey 'ztype' values. */
169 #define REDISMODULE_ZSET_RANGE_NONE 0 /* This must always be 0. */
170 #define REDISMODULE_ZSET_RANGE_LEX 1
171 #define REDISMODULE_ZSET_RANGE_SCORE 2
172 #define REDISMODULE_ZSET_RANGE_POS 3
173
174 /* Function pointer type of a function representing a command inside
175 * a Redis module. */
176 struct RedisModuleBlockedClient;
177 typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
178 typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
179
180 /* This struct holds the information about a command registered by a module.*/
181 struct RedisModuleCommandProxy {
182 struct RedisModule *module;
183 RedisModuleCmdFunc func;
184 struct redisCommand *rediscmd;
185 };
186 typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
187
188 #define REDISMODULE_REPLYFLAG_NONE 0
189 #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
190 #define REDISMODULE_REPLYFLAG_NESTED (1<<1) /* Nested reply object. No proto
191 or struct free. */
192
193 /* Reply of RM_Call() function. The function is filled in a lazy
194 * way depending on the function called on the reply structure. By default
195 * only the type, proto and protolen are filled. */
196 typedef struct RedisModuleCallReply {
197 RedisModuleCtx *ctx;
198 int type; /* REDISMODULE_REPLY_... */
199 int flags; /* REDISMODULE_REPLYFLAG_... */
200 size_t len; /* Len of strings or num of elements of arrays. */
201 char *proto; /* Raw reply protocol. An SDS string at top-level object. */
202 size_t protolen;/* Length of protocol. */
203 union {
204 const char *str; /* String pointer for string and error replies. This
205 does not need to be freed, always points inside
206 a reply->proto buffer of the reply object or, in
207 case of array elements, of parent reply objects. */
208 long long ll; /* Reply value for integer reply. */
209 struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
210 } val;
211 } RedisModuleCallReply;
212
213 /* Structure representing a blocked client. We get a pointer to such
214 * an object when blocking from modules. */
215 typedef struct RedisModuleBlockedClient {
216 client *client; /* Pointer to the blocked client. or NULL if the client
217 was destroyed during the life of this object. */
218 RedisModule *module; /* Module blocking the client. */
219 RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
220 RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
221 RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
222 void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
223 void *privdata; /* Module private data that may be used by the reply
224 or timeout callback. It is set via the
225 RedisModule_UnblockClient() API. */
226 client *reply_client; /* Fake client used to accumulate replies
227 in thread safe contexts. */
228 int dbid; /* Database number selected by the original client. */
229 } RedisModuleBlockedClient;
230
231 static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
232 static list *moduleUnblockedClients;
233
234 /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
235 * allow thread safe contexts to execute commands at a safe moment. */
236 static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
237
238
239 /* Function pointer type for keyspace event notification subscriptions from modules. */
240 typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
241
242 /* Keyspace notification subscriber information.
243 * See RM_SubscribeToKeyspaceEvents() for more information. */
244 typedef struct RedisModuleKeyspaceSubscriber {
245 /* The module subscribed to the event */
246 RedisModule *module;
247 /* Notification callback in the module*/
248 RedisModuleNotificationFunc notify_callback;
249 /* A bit mask of the events the module is interested in */
250 int event_mask;
251 /* Active flag set on entry, to avoid reentrant subscribers
252 * calling themselves */
253 int active;
254 } RedisModuleKeyspaceSubscriber;
255
256 /* The module keyspace notification subscribers list */
257 static list *moduleKeyspaceSubscribers;
258
259 /* Static client recycled for when we need to provide a context with a client
260 * in a situation where there is no client to provide. This avoidsallocating
261 * a new client per round. For instance this is used in the keyspace
262 * notifications, timers and cluster messages callbacks. */
263 static client *moduleFreeContextReusedClient;
264
265 /* Data structures related to the exported dictionary data structure. */
266 typedef struct RedisModuleDict {
267 rax *rax; /* The radix tree. */
268 } RedisModuleDict;
269
270 typedef struct RedisModuleDictIter {
271 RedisModuleDict *dict;
272 raxIterator ri;
273 } RedisModuleDictIter;
274
275 typedef struct RedisModuleCommandFilterCtx {
276 RedisModuleString **argv;
277 int argc;
278 } RedisModuleCommandFilterCtx;
279
280 typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
281
282 typedef struct RedisModuleCommandFilter {
283 /* The module that registered the filter */
284 RedisModule *module;
285 /* Filter callback function */
286 RedisModuleCommandFilterFunc callback;
287 /* REDISMODULE_CMDFILTER_* flags */
288 int flags;
289 } RedisModuleCommandFilter;
290
291 /* Registered filters */
292 static list *moduleCommandFilters;
293
294 /* --------------------------------------------------------------------------
295 * Prototypes
296 * -------------------------------------------------------------------------- */
297
298 void RM_FreeCallReply(RedisModuleCallReply *reply);
299 void RM_CloseKey(RedisModuleKey *key);
300 void autoMemoryCollect(RedisModuleCtx *ctx);
301 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
302 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
303 void RM_ZsetRangeStop(RedisModuleKey *kp);
304 static void zsetKeyReset(RedisModuleKey *key);
305 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
306
307 /* --------------------------------------------------------------------------
308 * Heap allocation raw functions
309 * -------------------------------------------------------------------------- */
310
311 /* Use like malloc(). Memory allocated with this function is reported in
312 * Redis INFO memory, used for keys eviction according to maxmemory settings
313 * and in general is taken into account as memory allocated by Redis.
314 * You should avoid using malloc(). */
RM_Alloc(size_t bytes)315 void *RM_Alloc(size_t bytes) {
316 return zmalloc(bytes);
317 }
318
319 /* Use like calloc(). Memory allocated with this function is reported in
320 * Redis INFO memory, used for keys eviction according to maxmemory settings
321 * and in general is taken into account as memory allocated by Redis.
322 * You should avoid using calloc() directly. */
RM_Calloc(size_t nmemb,size_t size)323 void *RM_Calloc(size_t nmemb, size_t size) {
324 return zcalloc(nmemb*size);
325 }
326
327 /* Use like realloc() for memory obtained with RedisModule_Alloc(). */
RM_Realloc(void * ptr,size_t bytes)328 void* RM_Realloc(void *ptr, size_t bytes) {
329 return zrealloc(ptr,bytes);
330 }
331
332 /* Use like free() for memory obtained by RedisModule_Alloc() and
333 * RedisModule_Realloc(). However you should never try to free with
334 * RedisModule_Free() memory allocated with malloc() inside your module. */
RM_Free(void * ptr)335 void RM_Free(void *ptr) {
336 zfree(ptr);
337 }
338
339 /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */
RM_Strdup(const char * str)340 char *RM_Strdup(const char *str) {
341 return zstrdup(str);
342 }
343
344 /* --------------------------------------------------------------------------
345 * Pool allocator
346 * -------------------------------------------------------------------------- */
347
348 /* Release the chain of blocks used for pool allocations. */
poolAllocRelease(RedisModuleCtx * ctx)349 void poolAllocRelease(RedisModuleCtx *ctx) {
350 RedisModulePoolAllocBlock *head = ctx->pa_head, *next;
351
352 while(head != NULL) {
353 next = head->next;
354 zfree(head);
355 head = next;
356 }
357 ctx->pa_head = NULL;
358 }
359
360 /* Return heap allocated memory that will be freed automatically when the
361 * module callback function returns. Mostly suitable for small allocations
362 * that are short living and must be released when the callback returns
363 * anyway. The returned memory is aligned to the architecture word size
364 * if at least word size bytes are requested, otherwise it is just
365 * aligned to the next power of two, so for example a 3 bytes request is
366 * 4 bytes aligned while a 2 bytes request is 2 bytes aligned.
367 *
368 * There is no realloc style function since when this is needed to use the
369 * pool allocator is not a good idea.
370 *
371 * The function returns NULL if `bytes` is 0. */
RM_PoolAlloc(RedisModuleCtx * ctx,size_t bytes)372 void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
373 if (bytes == 0) return NULL;
374 RedisModulePoolAllocBlock *b = ctx->pa_head;
375 size_t left = b ? b->size - b->used : 0;
376
377 /* Fix alignment. */
378 if (left >= bytes) {
379 size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN;
380 while (bytes < alignment && alignment/2 >= bytes) alignment /= 2;
381 if (b->used % alignment)
382 b->used += alignment - (b->used % alignment);
383 left = (b->used > b->size) ? 0 : b->size - b->used;
384 }
385
386 /* Create a new block if needed. */
387 if (left < bytes) {
388 size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE;
389 if (blocksize < bytes) blocksize = bytes;
390 b = zmalloc(sizeof(*b) + blocksize);
391 b->size = blocksize;
392 b->used = 0;
393 b->next = ctx->pa_head;
394 ctx->pa_head = b;
395 }
396
397 char *retval = b->memory + b->used;
398 b->used += bytes;
399 return retval;
400 }
401
402 /* --------------------------------------------------------------------------
403 * Helpers for modules API implementation
404 * -------------------------------------------------------------------------- */
405
406 /* Create an empty key of the specified type. 'kp' must point to a key object
407 * opened for writing where the .value member is set to NULL because the
408 * key was found to be non existing.
409 *
410 * On success REDISMODULE_OK is returned and the key is populated with
411 * the value of the specified type. The function fails and returns
412 * REDISMODULE_ERR if:
413 *
414 * 1) The key is not open for writing.
415 * 2) The key is not empty.
416 * 3) The specified type is unknown.
417 */
moduleCreateEmptyKey(RedisModuleKey * key,int type)418 int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
419 robj *obj;
420
421 /* The key must be open for writing and non existing to proceed. */
422 if (!(key->mode & REDISMODULE_WRITE) || key->value)
423 return REDISMODULE_ERR;
424
425 switch(type) {
426 case REDISMODULE_KEYTYPE_LIST:
427 obj = createQuicklistObject();
428 quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
429 server.list_compress_depth);
430 break;
431 case REDISMODULE_KEYTYPE_ZSET:
432 obj = createZsetZiplistObject();
433 break;
434 case REDISMODULE_KEYTYPE_HASH:
435 obj = createHashObject();
436 break;
437 default: return REDISMODULE_ERR;
438 }
439 dbAdd(key->db,key->key,obj);
440 key->value = obj;
441 return REDISMODULE_OK;
442 }
443
444 /* This function is called in low-level API implementation functions in order
445 * to check if the value associated with the key remained empty after an
446 * operation that removed elements from an aggregate data type.
447 *
448 * If this happens, the key is deleted from the DB and the key object state
449 * is set to the right one in order to be targeted again by write operations
450 * possibly recreating the key if needed.
451 *
452 * The function returns 1 if the key value object is found empty and is
453 * deleted, otherwise 0 is returned. */
moduleDelKeyIfEmpty(RedisModuleKey * key)454 int moduleDelKeyIfEmpty(RedisModuleKey *key) {
455 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
456 int isempty;
457 robj *o = key->value;
458
459 switch(o->type) {
460 case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
461 case OBJ_SET: isempty = setTypeSize(o) == 0; break;
462 case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
463 case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
464 default: isempty = 0;
465 }
466
467 if (isempty) {
468 dbDelete(key->db,key->key);
469 key->value = NULL;
470 return 1;
471 } else {
472 return 0;
473 }
474 }
475
476 /* --------------------------------------------------------------------------
477 * Service API exported to modules
478 *
479 * Note that all the exported APIs are called RM_<funcname> in the core
480 * and RedisModule_<funcname> in the module side (defined as function
481 * pointers in redismodule.h). In this way the dynamic linker does not
482 * mess with our global function pointers, overriding it with the symbols
483 * defined in the main executable having the same names.
484 * -------------------------------------------------------------------------- */
485
486 /* Lookup the requested module API and store the function pointer into the
487 * target pointer. The function returns REDISMODULE_ERR if there is no such
488 * named API, otherwise REDISMODULE_OK.
489 *
490 * This function is not meant to be used by modules developer, it is only
491 * used implicitly by including redismodule.h. */
RM_GetApi(const char * funcname,void ** targetPtrPtr)492 int RM_GetApi(const char *funcname, void **targetPtrPtr) {
493 dictEntry *he = dictFind(server.moduleapi, funcname);
494 if (!he) return REDISMODULE_ERR;
495 *targetPtrPtr = dictGetVal(he);
496 return REDISMODULE_OK;
497 }
498
499 /* Free the context after the user function was called. */
moduleFreeContext(RedisModuleCtx * ctx)500 void moduleFreeContext(RedisModuleCtx *ctx) {
501 autoMemoryCollect(ctx);
502 poolAllocRelease(ctx);
503 if (ctx->postponed_arrays) {
504 zfree(ctx->postponed_arrays);
505 ctx->postponed_arrays_count = 0;
506 serverLog(LL_WARNING,
507 "API misuse detected in module %s: "
508 "RedisModule_ReplyWithArray(REDISMODULE_POSTPONED_ARRAY_LEN) "
509 "not matched by the same number of RedisModule_SetReplyArrayLen() "
510 "calls.",
511 ctx->module->name);
512 }
513 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
514 }
515
516 /* Helper function for when a command callback is called, in order to handle
517 * details needed to correctly replicate commands. */
moduleHandlePropagationAfterCommandCallback(RedisModuleCtx * ctx)518 void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
519 client *c = ctx->client;
520
521 if (c->flags & CLIENT_LUA) return;
522
523 /* Handle the replication of the final EXEC, since whatever a command
524 * emits is always wrapped around MULTI/EXEC. */
525 if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
526 robj *propargv[1];
527 propargv[0] = createStringObject("EXEC",4);
528 alsoPropagate(server.execCommand,c->db->id,propargv,1,
529 PROPAGATE_AOF|PROPAGATE_REPL);
530 decrRefCount(propargv[0]);
531 }
532 }
533
534 /* This Redis command binds the normal Redis command invocation with commands
535 * exported by modules. */
RedisModuleCommandDispatcher(client * c)536 void RedisModuleCommandDispatcher(client *c) {
537 RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
538 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
539
540 ctx.module = cp->module;
541 ctx.client = c;
542 cp->func(&ctx,(void**)c->argv,c->argc);
543 moduleHandlePropagationAfterCommandCallback(&ctx);
544 moduleFreeContext(&ctx);
545
546 /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
547 * expand the query buffer, and in order to avoid a big object copy
548 * the query buffer SDS may be used directly as the SDS string backing
549 * the client argument vectors: sometimes this will result in the SDS
550 * string having unused space at the end. Later if a module takes ownership
551 * of the RedisString, such space will be wasted forever. Inside the
552 * Redis core this is not a problem because tryObjectEncoding() is called
553 * before storing strings in the key space. Here we need to do it
554 * for the module. */
555 for (int i = 0; i < c->argc; i++) {
556 /* Only do the work if the module took ownership of the object:
557 * in that case the refcount is no longer 1. */
558 if (c->argv[i]->refcount > 1)
559 trimStringObjectIfNeeded(c->argv[i]);
560 }
561 }
562
563 /* This function returns the list of keys, with the same interface as the
564 * 'getkeys' function of the native commands, for module commands that exported
565 * the "getkeys-api" flag during the registration. This is done when the
566 * list of keys are not at fixed positions, so that first/last/step cannot
567 * be used.
568 *
569 * In order to accomplish its work, the module command is called, flagging
570 * the context in a way that the command can recognize this is a special
571 * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */
moduleGetCommandKeysViaAPI(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)572 int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
573 RedisModuleCommandProxy *cp = (void*)(unsigned long)cmd->getkeys_proc;
574 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
575
576 ctx.module = cp->module;
577 ctx.client = NULL;
578 ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST;
579 cp->func(&ctx,(void**)argv,argc);
580 int *res = ctx.keys_pos;
581 if (numkeys) *numkeys = ctx.keys_count;
582 moduleFreeContext(&ctx);
583 return res;
584 }
585
586 /* Return non-zero if a module command, that was declared with the
587 * flag "getkeys-api", is called in a special way to get the keys positions
588 * and not to get executed. Otherwise zero is returned. */
RM_IsKeysPositionRequest(RedisModuleCtx * ctx)589 int RM_IsKeysPositionRequest(RedisModuleCtx *ctx) {
590 return (ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST) != 0;
591 }
592
593 /* When a module command is called in order to obtain the position of
594 * keys, since it was flagged as "getkeys-api" during the registration,
595 * the command implementation checks for this special call using the
596 * RedisModule_IsKeysPositionRequest() API and uses this function in
597 * order to report keys, like in the following example:
598 *
599 * if (RedisModule_IsKeysPositionRequest(ctx)) {
600 * RedisModule_KeyAtPos(ctx,1);
601 * RedisModule_KeyAtPos(ctx,2);
602 * }
603 *
604 * Note: in the example below the get keys API would not be needed since
605 * keys are at fixed positions. This interface is only used for commands
606 * with a more complex structure. */
RM_KeyAtPos(RedisModuleCtx * ctx,int pos)607 void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
608 if (!(ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST)) return;
609 if (pos <= 0) return;
610 ctx->keys_pos = zrealloc(ctx->keys_pos,sizeof(int)*(ctx->keys_count+1));
611 ctx->keys_pos[ctx->keys_count++] = pos;
612 }
613
614 /* Helper for RM_CreateCommand(). Turns a string representing command
615 * flags into the command flags used by the Redis core.
616 *
617 * It returns the set of flags, or -1 if unknown flags are found. */
commandFlagsFromString(char * s)618 int commandFlagsFromString(char *s) {
619 int count, j;
620 int flags = 0;
621 sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
622 for (j = 0; j < count; j++) {
623 char *t = tokens[j];
624 if (!strcasecmp(t,"write")) flags |= CMD_WRITE;
625 else if (!strcasecmp(t,"readonly")) flags |= CMD_READONLY;
626 else if (!strcasecmp(t,"admin")) flags |= CMD_ADMIN;
627 else if (!strcasecmp(t,"deny-oom")) flags |= CMD_DENYOOM;
628 else if (!strcasecmp(t,"deny-script")) flags |= CMD_NOSCRIPT;
629 else if (!strcasecmp(t,"allow-loading")) flags |= CMD_LOADING;
630 else if (!strcasecmp(t,"pubsub")) flags |= CMD_PUBSUB;
631 else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
632 else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
633 else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
634 else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
635 else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
636 else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
637 else break;
638 }
639 sdsfreesplitres(tokens,count);
640 if (j != count) return -1; /* Some token not processed correctly. */
641 return flags;
642 }
643
644 /* Register a new command in the Redis server, that will be handled by
645 * calling the function pointer 'func' using the RedisModule calling
646 * convention. The function returns REDISMODULE_ERR if the specified command
647 * name is already busy or a set of invalid flags were passed, otherwise
648 * REDISMODULE_OK is returned and the new command is registered.
649 *
650 * This function must be called during the initialization of the module
651 * inside the RedisModule_OnLoad() function. Calling this function outside
652 * of the initialization function is not defined.
653 *
654 * The command function type is the following:
655 *
656 * int MyCommand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
657 *
658 * And is supposed to always return REDISMODULE_OK.
659 *
660 * The set of flags 'strflags' specify the behavior of the command, and should
661 * be passed as a C string composed of space separated words, like for
662 * example "write deny-oom". The set of flags are:
663 *
664 * * **"write"**: The command may modify the data set (it may also read
665 * from it).
666 * * **"readonly"**: The command returns data from keys but never writes.
667 * * **"admin"**: The command is an administrative command (may change
668 * replication or perform similar tasks).
669 * * **"deny-oom"**: The command may use additional memory and should be
670 * denied during out of memory conditions.
671 * * **"deny-script"**: Don't allow this command in Lua scripts.
672 * * **"allow-loading"**: Allow this command while the server is loading data.
673 * Only commands not interacting with the data set
674 * should be allowed to run in this mode. If not sure
675 * don't use this flag.
676 * * **"pubsub"**: The command publishes things on Pub/Sub channels.
677 * * **"random"**: The command may have different outputs even starting
678 * from the same input arguments and key values.
679 * * **"allow-stale"**: The command is allowed to run on slaves that don't
680 * serve stale data. Don't use if you don't know what
681 * this means.
682 * * **"no-monitor"**: Don't propagate the command on monitor. Use this if
683 * the command has sensible data among the arguments.
684 * * **"fast"**: The command time complexity is not greater
685 * than O(log(N)) where N is the size of the collection or
686 * anything else representing the normal scalability
687 * issue with the command.
688 * * **"getkeys-api"**: The command implements the interface to return
689 * the arguments that are keys. Used when start/stop/step
690 * is not enough because of the command syntax.
691 * * **"no-cluster"**: The command should not register in Redis Cluster
692 * since is not designed to work with it because, for
693 * example, is unable to report the position of the
694 * keys, programmatically creates key names, or any
695 * other reason.
696 */
RM_CreateCommand(RedisModuleCtx * ctx,const char * name,RedisModuleCmdFunc cmdfunc,const char * strflags,int firstkey,int lastkey,int keystep)697 int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
698 int flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
699 if (flags == -1) return REDISMODULE_ERR;
700 if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
701 return REDISMODULE_ERR;
702
703 struct redisCommand *rediscmd;
704 RedisModuleCommandProxy *cp;
705 sds cmdname = sdsnew(name);
706
707 /* Check if the command name is busy. */
708 if (lookupCommand(cmdname) != NULL) {
709 sdsfree(cmdname);
710 return REDISMODULE_ERR;
711 }
712
713 /* Create a command "proxy", which is a structure that is referenced
714 * in the command table, so that the generic command that works as
715 * binding between modules and Redis, can know what function to call
716 * and what the module is.
717 *
718 * Note that we use the Redis command table 'getkeys_proc' in order to
719 * pass a reference to the command proxy structure. */
720 cp = zmalloc(sizeof(*cp));
721 cp->module = ctx->module;
722 cp->func = cmdfunc;
723 cp->rediscmd = zmalloc(sizeof(*rediscmd));
724 cp->rediscmd->name = cmdname;
725 cp->rediscmd->proc = RedisModuleCommandDispatcher;
726 cp->rediscmd->arity = -1;
727 cp->rediscmd->flags = flags | CMD_MODULE;
728 cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
729 cp->rediscmd->firstkey = firstkey;
730 cp->rediscmd->lastkey = lastkey;
731 cp->rediscmd->keystep = keystep;
732 cp->rediscmd->microseconds = 0;
733 cp->rediscmd->calls = 0;
734 dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
735 dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
736 return REDISMODULE_OK;
737 }
738
739 /* Called by RM_Init() to setup the `ctx->module` structure.
740 *
741 * This is an internal function, Redis modules developers don't need
742 * to use it. */
RM_SetModuleAttribs(RedisModuleCtx * ctx,const char * name,int ver,int apiver)743 void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
744 RedisModule *module;
745
746 if (ctx->module != NULL) return;
747 module = zmalloc(sizeof(*module));
748 module->name = sdsnew((char*)name);
749 module->ver = ver;
750 module->apiver = apiver;
751 module->types = listCreate();
752 module->usedby = listCreate();
753 module->using = listCreate();
754 module->filters = listCreate();
755 module->in_call = 0;
756 ctx->module = module;
757 }
758
759 /* Return non-zero if the module name is busy.
760 * Otherwise zero is returned. */
RM_IsModuleNameBusy(const char * name)761 int RM_IsModuleNameBusy(const char *name) {
762 sds modulename = sdsnew(name);
763 dictEntry *de = dictFind(modules,modulename);
764 sdsfree(modulename);
765 return de != NULL;
766 }
767
768 /* Return the current UNIX time in milliseconds. */
RM_Milliseconds(void)769 long long RM_Milliseconds(void) {
770 return mstime();
771 }
772
773 /* --------------------------------------------------------------------------
774 * Automatic memory management for modules
775 * -------------------------------------------------------------------------- */
776
777 /* Enable automatic memory management. See API.md for more information.
778 *
779 * The function must be called as the first function of a command implementation
780 * that wants to use automatic memory. */
RM_AutoMemory(RedisModuleCtx * ctx)781 void RM_AutoMemory(RedisModuleCtx *ctx) {
782 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
783 }
784
785 /* Add a new object to release automatically when the callback returns. */
autoMemoryAdd(RedisModuleCtx * ctx,int type,void * ptr)786 void autoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) {
787 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
788 if (ctx->amqueue_used == ctx->amqueue_len) {
789 ctx->amqueue_len *= 2;
790 if (ctx->amqueue_len < 16) ctx->amqueue_len = 16;
791 ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len);
792 }
793 ctx->amqueue[ctx->amqueue_used].type = type;
794 ctx->amqueue[ctx->amqueue_used].ptr = ptr;
795 ctx->amqueue_used++;
796 }
797
798 /* Mark an object as freed in the auto release queue, so that users can still
799 * free things manually if they want.
800 *
801 * The function returns 1 if the object was actually found in the auto memory
802 * pool, otherwise 0 is returned. */
autoMemoryFreed(RedisModuleCtx * ctx,int type,void * ptr)803 int autoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) {
804 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return 0;
805
806 int count = (ctx->amqueue_used+1)/2;
807 for (int j = 0; j < count; j++) {
808 for (int side = 0; side < 2; side++) {
809 /* For side = 0 check right side of the array, for
810 * side = 1 check the left side instead (zig-zag scanning). */
811 int i = (side == 0) ? (ctx->amqueue_used - 1 - j) : j;
812 if (ctx->amqueue[i].type == type &&
813 ctx->amqueue[i].ptr == ptr)
814 {
815 ctx->amqueue[i].type = REDISMODULE_AM_FREED;
816
817 /* Switch the freed element and the last element, to avoid growing
818 * the queue unnecessarily if we allocate/free in a loop */
819 if (i != ctx->amqueue_used-1) {
820 ctx->amqueue[i] = ctx->amqueue[ctx->amqueue_used-1];
821 }
822
823 /* Reduce the size of the queue because we either moved the top
824 * element elsewhere or freed it */
825 ctx->amqueue_used--;
826 return 1;
827 }
828 }
829 }
830 return 0;
831 }
832
833 /* Release all the objects in queue. */
autoMemoryCollect(RedisModuleCtx * ctx)834 void autoMemoryCollect(RedisModuleCtx *ctx) {
835 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
836 /* Clear the AUTO_MEMORY flag from the context, otherwise the functions
837 * we call to free the resources, will try to scan the auto release
838 * queue to mark the entries as freed. */
839 ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY;
840 int j;
841 for (j = 0; j < ctx->amqueue_used; j++) {
842 void *ptr = ctx->amqueue[j].ptr;
843 switch(ctx->amqueue[j].type) {
844 case REDISMODULE_AM_STRING: decrRefCount(ptr); break;
845 case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break;
846 case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break;
847 case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break;
848 }
849 }
850 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
851 zfree(ctx->amqueue);
852 ctx->amqueue = NULL;
853 ctx->amqueue_len = 0;
854 ctx->amqueue_used = 0;
855 }
856
857 /* --------------------------------------------------------------------------
858 * String objects APIs
859 * -------------------------------------------------------------------------- */
860
861 /* Create a new module string object. The returned string must be freed
862 * with RedisModule_FreeString(), unless automatic memory is enabled.
863 *
864 * The string is created by copying the `len` bytes starting
865 * at `ptr`. No reference is retained to the passed buffer.
866 *
867 * The module context 'ctx' is optional and may be NULL if you want to create
868 * a string out of the context scope. However in that case, the automatic
869 * memory management will not be available, and the string memory must be
870 * managed manually. */
RM_CreateString(RedisModuleCtx * ctx,const char * ptr,size_t len)871 RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) {
872 RedisModuleString *o = createStringObject(ptr,len);
873 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
874 return o;
875 }
876
877 /* Create a new module string object from a printf format and arguments.
878 * The returned string must be freed with RedisModule_FreeString(), unless
879 * automatic memory is enabled.
880 *
881 * The string is created using the sds formatter function sdscatvprintf().
882 *
883 * The passed context 'ctx' may be NULL if necessary, see the
884 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringPrintf(RedisModuleCtx * ctx,const char * fmt,...)885 RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) {
886 sds s = sdsempty();
887
888 va_list ap;
889 va_start(ap, fmt);
890 s = sdscatvprintf(s, fmt, ap);
891 va_end(ap);
892
893 RedisModuleString *o = createObject(OBJ_STRING, s);
894 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
895
896 return o;
897 }
898
899
900 /* Like RedisModule_CreatString(), but creates a string starting from a long long
901 * integer instead of taking a buffer and its length.
902 *
903 * The returned string must be released with RedisModule_FreeString() or by
904 * enabling automatic memory management.
905 *
906 * The passed context 'ctx' may be NULL if necessary, see the
907 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromLongLong(RedisModuleCtx * ctx,long long ll)908 RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) {
909 char buf[LONG_STR_SIZE];
910 size_t len = ll2string(buf,sizeof(buf),ll);
911 return RM_CreateString(ctx,buf,len);
912 }
913
914 /* Like RedisModule_CreatString(), but creates a string starting from another
915 * RedisModuleString.
916 *
917 * The returned string must be released with RedisModule_FreeString() or by
918 * enabling automatic memory management.
919 *
920 * The passed context 'ctx' may be NULL if necessary, see the
921 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromString(RedisModuleCtx * ctx,const RedisModuleString * str)922 RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str) {
923 RedisModuleString *o = dupStringObject(str);
924 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
925 return o;
926 }
927
928 /* Free a module string object obtained with one of the Redis modules API calls
929 * that return new string objects.
930 *
931 * It is possible to call this function even when automatic memory management
932 * is enabled. In that case the string will be released ASAP and removed
933 * from the pool of string to release at the end.
934 *
935 * If the string was created with a NULL context 'ctx', it is also possible to
936 * pass ctx as NULL when releasing the string (but passing a context will not
937 * create any issue). Strings created with a context should be freed also passing
938 * the context, so if you want to free a string out of context later, make sure
939 * to create it using a NULL context. */
RM_FreeString(RedisModuleCtx * ctx,RedisModuleString * str)940 void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
941 decrRefCount(str);
942 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
943 }
944
945 /* Every call to this function, will make the string 'str' requiring
946 * an additional call to RedisModule_FreeString() in order to really
947 * free the string. Note that the automatic freeing of the string obtained
948 * enabling modules automatic memory management counts for one
949 * RedisModule_FreeString() call (it is just executed automatically).
950 *
951 * Normally you want to call this function when, at the same time
952 * the following conditions are true:
953 *
954 * 1) You have automatic memory management enabled.
955 * 2) You want to create string objects.
956 * 3) Those string objects you create need to live *after* the callback
957 * function(for example a command implementation) creating them returns.
958 *
959 * Usually you want this in order to store the created string object
960 * into your own data structure, for example when implementing a new data
961 * type.
962 *
963 * Note that when memory management is turned off, you don't need
964 * any call to RetainString() since creating a string will always result
965 * into a string that lives after the callback function returns, if
966 * no FreeString() call is performed.
967 *
968 * It is possible to call this function with a NULL context. */
RM_RetainString(RedisModuleCtx * ctx,RedisModuleString * str)969 void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
970 if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
971 /* Increment the string reference counting only if we can't
972 * just remove the object from the list of objects that should
973 * be reclaimed. Why we do that, instead of just incrementing
974 * the refcount in any case, and let the automatic FreeString()
975 * call at the end to bring the refcount back at the desired
976 * value? Because this way we ensure that the object refcount
977 * value is 1 (instead of going to 2 to be dropped later to 1)
978 * after the call to this function. This is needed for functions
979 * like RedisModule_StringAppendBuffer() to work. */
980 incrRefCount(str);
981 }
982 }
983
984 /* Given a string module object, this function returns the string pointer
985 * and length of the string. The returned pointer and length should only
986 * be used for read only accesses and never modified. */
RM_StringPtrLen(const RedisModuleString * str,size_t * len)987 const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len) {
988 if (str == NULL) {
989 const char *errmsg = "(NULL string reply referenced in module)";
990 if (len) *len = strlen(errmsg);
991 return errmsg;
992 }
993 if (len) *len = sdslen(str->ptr);
994 return str->ptr;
995 }
996
997 /* --------------------------------------------------------------------------
998 * Higher level string operations
999 * ------------------------------------------------------------------------- */
1000
1001 /* Convert the string into a long long integer, storing it at `*ll`.
1002 * Returns REDISMODULE_OK on success. If the string can't be parsed
1003 * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
1004 * is returned. */
RM_StringToLongLong(const RedisModuleString * str,long long * ll)1005 int RM_StringToLongLong(const RedisModuleString *str, long long *ll) {
1006 return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK :
1007 REDISMODULE_ERR;
1008 }
1009
1010 /* Convert the string into a double, storing it at `*d`.
1011 * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
1012 * not a valid string representation of a double value. */
RM_StringToDouble(const RedisModuleString * str,double * d)1013 int RM_StringToDouble(const RedisModuleString *str, double *d) {
1014 int retval = getDoubleFromObject(str,d);
1015 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1016 }
1017
1018 /* Compare two string objects, returning -1, 0 or 1 respectively if
1019 * a < b, a == b, a > b. Strings are compared byte by byte as two
1020 * binary blobs without any encoding care / collation attempt. */
RM_StringCompare(RedisModuleString * a,RedisModuleString * b)1021 int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
1022 return compareStringObjects(a,b);
1023 }
1024
1025 /* Return the (possibly modified in encoding) input 'str' object if
1026 * the string is unshared, otherwise NULL is returned. */
moduleAssertUnsharedString(RedisModuleString * str)1027 RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
1028 if (str->refcount != 1) {
1029 serverLog(LL_WARNING,
1030 "Module attempted to use an in-place string modify operation "
1031 "with a string referenced multiple times. Please check the code "
1032 "for API usage correctness.");
1033 return NULL;
1034 }
1035 if (str->encoding == OBJ_ENCODING_EMBSTR) {
1036 /* Note: here we "leak" the additional allocation that was
1037 * used in order to store the embedded string in the object. */
1038 str->ptr = sdsnewlen(str->ptr,sdslen(str->ptr));
1039 str->encoding = OBJ_ENCODING_RAW;
1040 } else if (str->encoding == OBJ_ENCODING_INT) {
1041 /* Convert the string from integer to raw encoding. */
1042 str->ptr = sdsfromlonglong((long)str->ptr);
1043 str->encoding = OBJ_ENCODING_RAW;
1044 }
1045 return str;
1046 }
1047
1048 /* Append the specified buffer to the string 'str'. The string must be a
1049 * string created by the user that is referenced only a single time, otherwise
1050 * REDISMODULE_ERR is returned and the operation is not performed. */
RM_StringAppendBuffer(RedisModuleCtx * ctx,RedisModuleString * str,const char * buf,size_t len)1051 int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len) {
1052 UNUSED(ctx);
1053 str = moduleAssertUnsharedString(str);
1054 if (str == NULL) return REDISMODULE_ERR;
1055 str->ptr = sdscatlen(str->ptr,buf,len);
1056 return REDISMODULE_OK;
1057 }
1058
1059 /* --------------------------------------------------------------------------
1060 * Reply APIs
1061 *
1062 * Most functions always return REDISMODULE_OK so you can use it with
1063 * 'return' in order to return from the command implementation with:
1064 *
1065 * if (... some condition ...)
1066 * return RM_ReplyWithLongLong(ctx,mycount);
1067 * -------------------------------------------------------------------------- */
1068
1069 /* Send an error about the number of arguments given to the command,
1070 * citing the command name in the error message.
1071 *
1072 * Example:
1073 *
1074 * if (argc != 3) return RedisModule_WrongArity(ctx);
1075 */
RM_WrongArity(RedisModuleCtx * ctx)1076 int RM_WrongArity(RedisModuleCtx *ctx) {
1077 addReplyErrorFormat(ctx->client,
1078 "wrong number of arguments for '%s' command",
1079 (char*)ctx->client->argv[0]->ptr);
1080 return REDISMODULE_OK;
1081 }
1082
1083 /* Return the client object the `RM_Reply*` functions should target.
1084 * Normally this is just `ctx->client`, that is the client that called
1085 * the module command, however in the case of thread safe contexts there
1086 * is no directly associated client (since it would not be safe to access
1087 * the client from a thread), so instead the blocked client object referenced
1088 * in the thread safe context, has a fake client that we just use to accumulate
1089 * the replies. Later, when the client is unblocked, the accumulated replies
1090 * are appended to the actual client.
1091 *
1092 * The function returns the client pointer depending on the context, or
1093 * NULL if there is no potential client. This happens when we are in the
1094 * context of a thread safe context that was not initialized with a blocked
1095 * client object. Other contexts without associated clients are the ones
1096 * initialized to run the timers callbacks. */
moduleGetReplyClient(RedisModuleCtx * ctx)1097 client *moduleGetReplyClient(RedisModuleCtx *ctx) {
1098 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
1099 if (ctx->blocked_client)
1100 return ctx->blocked_client->reply_client;
1101 else
1102 return NULL;
1103 } else {
1104 /* If this is a non thread safe context, just return the client
1105 * that is running the command if any. This may be NULL as well
1106 * in the case of contexts that are not executed with associated
1107 * clients, like timer contexts. */
1108 return ctx->client;
1109 }
1110 }
1111
1112 /* Send an integer reply to the client, with the specified long long value.
1113 * The function always returns REDISMODULE_OK. */
RM_ReplyWithLongLong(RedisModuleCtx * ctx,long long ll)1114 int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
1115 client *c = moduleGetReplyClient(ctx);
1116 if (c == NULL) return REDISMODULE_OK;
1117 addReplyLongLong(c,ll);
1118 return REDISMODULE_OK;
1119 }
1120
1121 /* Reply with an error or simple string (status message). Used to implement
1122 * ReplyWithSimpleString() and ReplyWithError().
1123 * The function always returns REDISMODULE_OK. */
replyWithStatus(RedisModuleCtx * ctx,const char * msg,char * prefix)1124 int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
1125 client *c = moduleGetReplyClient(ctx);
1126 if (c == NULL) return REDISMODULE_OK;
1127 sds strmsg = sdsnewlen(prefix,1);
1128 strmsg = sdscat(strmsg,msg);
1129 strmsg = sdscatlen(strmsg,"\r\n",2);
1130 addReplySds(c,strmsg);
1131 return REDISMODULE_OK;
1132 }
1133
1134 /* Reply with the error 'err'.
1135 *
1136 * Note that 'err' must contain all the error, including
1137 * the initial error code. The function only provides the initial "-", so
1138 * the usage is, for example:
1139 *
1140 * RedisModule_ReplyWithError(ctx,"ERR Wrong Type");
1141 *
1142 * and not just:
1143 *
1144 * RedisModule_ReplyWithError(ctx,"Wrong Type");
1145 *
1146 * The function always returns REDISMODULE_OK.
1147 */
RM_ReplyWithError(RedisModuleCtx * ctx,const char * err)1148 int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
1149 return replyWithStatus(ctx,err,"-");
1150 }
1151
1152 /* Reply with a simple string (+... \r\n in RESP protocol). This replies
1153 * are suitable only when sending a small non-binary string with small
1154 * overhead, like "OK" or similar replies.
1155 *
1156 * The function always returns REDISMODULE_OK. */
RM_ReplyWithSimpleString(RedisModuleCtx * ctx,const char * msg)1157 int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
1158 return replyWithStatus(ctx,msg,"+");
1159 }
1160
1161 /* Reply with an array type of 'len' elements. However 'len' other calls
1162 * to `ReplyWith*` style functions must follow in order to emit the elements
1163 * of the array.
1164 *
1165 * When producing arrays with a number of element that is not known beforehand
1166 * the function can be called with the special count
1167 * REDISMODULE_POSTPONED_ARRAY_LEN, and the actual number of elements can be
1168 * later set with RedisModule_ReplySetArrayLength() (which will set the
1169 * latest "open" count if there are multiple ones).
1170 *
1171 * The function always returns REDISMODULE_OK. */
RM_ReplyWithArray(RedisModuleCtx * ctx,long len)1172 int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
1173 client *c = moduleGetReplyClient(ctx);
1174 if (c == NULL) return REDISMODULE_OK;
1175 if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
1176 ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)*
1177 (ctx->postponed_arrays_count+1));
1178 ctx->postponed_arrays[ctx->postponed_arrays_count] =
1179 addDeferredMultiBulkLength(c);
1180 ctx->postponed_arrays_count++;
1181 } else {
1182 addReplyMultiBulkLen(c,len);
1183 }
1184 return REDISMODULE_OK;
1185 }
1186
1187 /* When RedisModule_ReplyWithArray() is used with the argument
1188 * REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number
1189 * of items we are going to output as elements of the array, this function
1190 * will take care to set the array length.
1191 *
1192 * Since it is possible to have multiple array replies pending with unknown
1193 * length, this function guarantees to always set the latest array length
1194 * that was created in a postponed way.
1195 *
1196 * For example in order to output an array like [1,[10,20,30]] we
1197 * could write:
1198 *
1199 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1200 * RedisModule_ReplyWithLongLong(ctx,1);
1201 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1202 * RedisModule_ReplyWithLongLong(ctx,10);
1203 * RedisModule_ReplyWithLongLong(ctx,20);
1204 * RedisModule_ReplyWithLongLong(ctx,30);
1205 * RedisModule_ReplySetArrayLength(ctx,3); // Set len of 10,20,30 array.
1206 * RedisModule_ReplySetArrayLength(ctx,2); // Set len of top array
1207 *
1208 * Note that in the above example there is no reason to postpone the array
1209 * length, since we produce a fixed number of elements, but in the practice
1210 * the code may use an iterator or other ways of creating the output so
1211 * that is not easy to calculate in advance the number of elements.
1212 */
RM_ReplySetArrayLength(RedisModuleCtx * ctx,long len)1213 void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
1214 client *c = moduleGetReplyClient(ctx);
1215 if (c == NULL) return;
1216 if (ctx->postponed_arrays_count == 0) {
1217 serverLog(LL_WARNING,
1218 "API misuse detected in module %s: "
1219 "RedisModule_ReplySetArrayLength() called without previous "
1220 "RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN) "
1221 "call.", ctx->module->name);
1222 return;
1223 }
1224 ctx->postponed_arrays_count--;
1225 setDeferredMultiBulkLength(c,
1226 ctx->postponed_arrays[ctx->postponed_arrays_count],
1227 len);
1228 if (ctx->postponed_arrays_count == 0) {
1229 zfree(ctx->postponed_arrays);
1230 ctx->postponed_arrays = NULL;
1231 }
1232 }
1233
1234 /* Reply with a bulk string, taking in input a C buffer pointer and length.
1235 *
1236 * The function always returns REDISMODULE_OK. */
RM_ReplyWithStringBuffer(RedisModuleCtx * ctx,const char * buf,size_t len)1237 int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
1238 client *c = moduleGetReplyClient(ctx);
1239 if (c == NULL) return REDISMODULE_OK;
1240 addReplyBulkCBuffer(c,(char*)buf,len);
1241 return REDISMODULE_OK;
1242 }
1243
1244 /* Reply with a bulk string, taking in input a RedisModuleString object.
1245 *
1246 * The function always returns REDISMODULE_OK. */
RM_ReplyWithString(RedisModuleCtx * ctx,RedisModuleString * str)1247 int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
1248 client *c = moduleGetReplyClient(ctx);
1249 if (c == NULL) return REDISMODULE_OK;
1250 addReplyBulk(c,str);
1251 return REDISMODULE_OK;
1252 }
1253
1254 /* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
1255 * as the string "$-1\r\n".
1256 *
1257 * The function always returns REDISMODULE_OK. */
RM_ReplyWithNull(RedisModuleCtx * ctx)1258 int RM_ReplyWithNull(RedisModuleCtx *ctx) {
1259 client *c = moduleGetReplyClient(ctx);
1260 if (c == NULL) return REDISMODULE_OK;
1261 addReply(c,shared.nullbulk);
1262 return REDISMODULE_OK;
1263 }
1264
1265 /* Reply exactly what a Redis command returned us with RedisModule_Call().
1266 * This function is useful when we use RedisModule_Call() in order to
1267 * execute some command, as we want to reply to the client exactly the
1268 * same reply we obtained by the command.
1269 *
1270 * The function always returns REDISMODULE_OK. */
RM_ReplyWithCallReply(RedisModuleCtx * ctx,RedisModuleCallReply * reply)1271 int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
1272 client *c = moduleGetReplyClient(ctx);
1273 if (c == NULL) return REDISMODULE_OK;
1274 sds proto = sdsnewlen(reply->proto, reply->protolen);
1275 addReplySds(c,proto);
1276 return REDISMODULE_OK;
1277 }
1278
1279 /* Send a string reply obtained converting the double 'd' into a bulk string.
1280 * This function is basically equivalent to converting a double into
1281 * a string into a C buffer, and then calling the function
1282 * RedisModule_ReplyWithStringBuffer() with the buffer and length.
1283 *
1284 * The function always returns REDISMODULE_OK. */
RM_ReplyWithDouble(RedisModuleCtx * ctx,double d)1285 int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
1286 client *c = moduleGetReplyClient(ctx);
1287 if (c == NULL) return REDISMODULE_OK;
1288 addReplyDouble(c,d);
1289 return REDISMODULE_OK;
1290 }
1291
1292 /* --------------------------------------------------------------------------
1293 * Commands replication API
1294 * -------------------------------------------------------------------------- */
1295
1296 /* Helper function to replicate MULTI the first time we replicate something
1297 * in the context of a command execution. EXEC will be handled by the
1298 * RedisModuleCommandDispatcher() function. */
moduleReplicateMultiIfNeeded(RedisModuleCtx * ctx)1299 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
1300 /* Skip this if client explicitly wrap the command with MULTI, or if
1301 * the module command was called by a script. */
1302 if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return;
1303 /* If we already emitted MULTI return ASAP. */
1304 if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
1305 /* If this is a thread safe context, we do not want to wrap commands
1306 * executed into MUTLI/EXEC, they are executed as single commands
1307 * from an external client in essence. */
1308 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
1309 execCommandPropagateMulti(ctx->client);
1310 ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
1311 }
1312
1313 /* Replicate the specified command and arguments to slaves and AOF, as effect
1314 * of execution of the calling command implementation.
1315 *
1316 * The replicated commands are always wrapped into the MULTI/EXEC that
1317 * contains all the commands replicated in a given module command
1318 * execution. However the commands replicated with RedisModule_Call()
1319 * are the first items, the ones replicated with RedisModule_Replicate()
1320 * will all follow before the EXEC.
1321 *
1322 * Modules should try to use one interface or the other.
1323 *
1324 * This command follows exactly the same interface of RedisModule_Call(),
1325 * so a set of format specifiers must be passed, followed by arguments
1326 * matching the provided format specifiers.
1327 *
1328 * Please refer to RedisModule_Call() for more information.
1329 *
1330 * The command returns REDISMODULE_ERR if the format specifiers are invalid
1331 * or the command name does not belong to a known command. */
RM_Replicate(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)1332 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
1333 struct redisCommand *cmd;
1334 robj **argv = NULL;
1335 int argc = 0, flags = 0, j;
1336 va_list ap;
1337
1338 cmd = lookupCommandByCString((char*)cmdname);
1339 if (!cmd) return REDISMODULE_ERR;
1340
1341 /* Create the client and dispatch the command. */
1342 va_start(ap, fmt);
1343 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
1344 va_end(ap);
1345 if (argv == NULL) return REDISMODULE_ERR;
1346
1347 /* Replicate! */
1348 moduleReplicateMultiIfNeeded(ctx);
1349 alsoPropagate(cmd,ctx->client->db->id,argv,argc,
1350 PROPAGATE_AOF|PROPAGATE_REPL);
1351
1352 /* Release the argv. */
1353 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
1354 zfree(argv);
1355 server.dirty++;
1356 return REDISMODULE_OK;
1357 }
1358
1359 /* This function will replicate the command exactly as it was invoked
1360 * by the client. Note that this function will not wrap the command into
1361 * a MULTI/EXEC stanza, so it should not be mixed with other replication
1362 * commands.
1363 *
1364 * Basically this form of replication is useful when you want to propagate
1365 * the command to the slaves and AOF file exactly as it was called, since
1366 * the command can just be re-executed to deterministically re-create the
1367 * new state starting from the old one.
1368 *
1369 * The function always returns REDISMODULE_OK. */
RM_ReplicateVerbatim(RedisModuleCtx * ctx)1370 int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
1371 alsoPropagate(ctx->client->cmd,ctx->client->db->id,
1372 ctx->client->argv,ctx->client->argc,
1373 PROPAGATE_AOF|PROPAGATE_REPL);
1374 server.dirty++;
1375 return REDISMODULE_OK;
1376 }
1377
1378 /* --------------------------------------------------------------------------
1379 * DB and Key APIs -- Generic API
1380 * -------------------------------------------------------------------------- */
1381
1382 /* Return the ID of the current client calling the currently active module
1383 * command. The returned ID has a few guarantees:
1384 *
1385 * 1. The ID is different for each different client, so if the same client
1386 * executes a module command multiple times, it can be recognized as
1387 * having the same ID, otherwise the ID will be different.
1388 * 2. The ID increases monotonically. Clients connecting to the server later
1389 * are guaranteed to get IDs greater than any past ID previously seen.
1390 *
1391 * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way
1392 * to fetch the ID in the context the function was currently called. */
RM_GetClientId(RedisModuleCtx * ctx)1393 unsigned long long RM_GetClientId(RedisModuleCtx *ctx) {
1394 if (ctx->client == NULL) return 0;
1395 return ctx->client->id;
1396 }
1397
1398 /* Return the currently selected DB. */
RM_GetSelectedDb(RedisModuleCtx * ctx)1399 int RM_GetSelectedDb(RedisModuleCtx *ctx) {
1400 return ctx->client->db->id;
1401 }
1402
1403
1404 /* Return the current context's flags. The flags provide information on the
1405 * current request context (whether the client is a Lua script or in a MULTI),
1406 * and about the Redis instance in general, i.e replication and persistence.
1407 *
1408 * The available flags are:
1409 *
1410 * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script
1411 *
1412 * * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
1413 *
1414 * * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication
1415 * link by the MASTER
1416 *
1417 * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
1418 *
1419 * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
1420 *
1421 * * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
1422 *
1423 * * REDISMODULE_CTX_FLAGS_CLUSTER: The Redis instance is in cluster mode
1424 *
1425 * * REDISMODULE_CTX_FLAGS_AOF: The Redis instance has AOF enabled
1426 *
1427 * * REDISMODULE_CTX_FLAGS_RDB: The instance has RDB enabled
1428 *
1429 * * REDISMODULE_CTX_FLAGS_MAXMEMORY: The instance has Maxmemory set
1430 *
1431 * * REDISMODULE_CTX_FLAGS_EVICT: Maxmemory is set and has an eviction
1432 * policy that may delete keys
1433 *
1434 * * REDISMODULE_CTX_FLAGS_OOM: Redis is out of memory according to the
1435 * maxmemory setting.
1436 *
1437 * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
1438 * reaching the maxmemory level.
1439 */
RM_GetContextFlags(RedisModuleCtx * ctx)1440 int RM_GetContextFlags(RedisModuleCtx *ctx) {
1441
1442 int flags = 0;
1443 /* Client specific flags */
1444 if (ctx->client) {
1445 if (ctx->client->flags & CLIENT_LUA)
1446 flags |= REDISMODULE_CTX_FLAGS_LUA;
1447 if (ctx->client->flags & CLIENT_MULTI)
1448 flags |= REDISMODULE_CTX_FLAGS_MULTI;
1449 /* Module command recieved from MASTER, is replicated. */
1450 if (ctx->client->flags & CLIENT_MASTER)
1451 flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
1452 }
1453
1454 if (server.cluster_enabled)
1455 flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
1456
1457 /* Maxmemory and eviction policy */
1458 if (server.maxmemory > 0) {
1459 flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;
1460
1461 if (server.maxmemory_policy != MAXMEMORY_NO_EVICTION)
1462 flags |= REDISMODULE_CTX_FLAGS_EVICT;
1463 }
1464
1465 /* Persistence flags */
1466 if (server.aof_state != AOF_OFF)
1467 flags |= REDISMODULE_CTX_FLAGS_AOF;
1468 if (server.saveparamslen > 0)
1469 flags |= REDISMODULE_CTX_FLAGS_RDB;
1470
1471 /* Replication flags */
1472 if (server.masterhost == NULL) {
1473 flags |= REDISMODULE_CTX_FLAGS_MASTER;
1474 } else {
1475 flags |= REDISMODULE_CTX_FLAGS_SLAVE;
1476 if (server.repl_slave_ro)
1477 flags |= REDISMODULE_CTX_FLAGS_READONLY;
1478 }
1479
1480 /* OOM flag. */
1481 float level;
1482 int retval = getMaxmemoryState(NULL,NULL,NULL,&level);
1483 if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM;
1484 if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING;
1485
1486 return flags;
1487 }
1488
1489 /* Change the currently selected DB. Returns an error if the id
1490 * is out of range.
1491 *
1492 * Note that the client will retain the currently selected DB even after
1493 * the Redis command implemented by the module calling this function
1494 * returns.
1495 *
1496 * If the module command wishes to change something in a different DB and
1497 * returns back to the original one, it should call RedisModule_GetSelectedDb()
1498 * before in order to restore the old DB number before returning. */
RM_SelectDb(RedisModuleCtx * ctx,int newid)1499 int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
1500 int retval = selectDb(ctx->client,newid);
1501 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1502 }
1503
1504 /* Return an handle representing a Redis key, so that it is possible
1505 * to call other APIs with the key handle as argument to perform
1506 * operations on the key.
1507 *
1508 * The return value is the handle representing the key, that must be
1509 * closed with RM_CloseKey().
1510 *
1511 * If the key does not exist and WRITE mode is requested, the handle
1512 * is still returned, since it is possible to perform operations on
1513 * a yet not existing key (that will be created, for example, after
1514 * a list push operation). If the mode is just READ instead, and the
1515 * key does not exist, NULL is returned. However it is still safe to
1516 * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL
1517 * value. */
RM_OpenKey(RedisModuleCtx * ctx,robj * keyname,int mode)1518 void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
1519 RedisModuleKey *kp;
1520 robj *value;
1521
1522 if (mode & REDISMODULE_WRITE) {
1523 value = lookupKeyWrite(ctx->client->db,keyname);
1524 } else {
1525 value = lookupKeyRead(ctx->client->db,keyname);
1526 if (value == NULL) {
1527 return NULL;
1528 }
1529 }
1530
1531 /* Setup the key handle. */
1532 kp = zmalloc(sizeof(*kp));
1533 kp->ctx = ctx;
1534 kp->db = ctx->client->db;
1535 kp->key = keyname;
1536 incrRefCount(keyname);
1537 kp->value = value;
1538 kp->iter = NULL;
1539 kp->mode = mode;
1540 zsetKeyReset(kp);
1541 autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
1542 return (void*)kp;
1543 }
1544
1545 /* Close a key handle. */
RM_CloseKey(RedisModuleKey * key)1546 void RM_CloseKey(RedisModuleKey *key) {
1547 if (key == NULL) return;
1548 if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
1549 /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
1550 RM_ZsetRangeStop(key);
1551 decrRefCount(key->key);
1552 autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
1553 zfree(key);
1554 }
1555
1556 /* Return the type of the key. If the key pointer is NULL then
1557 * REDISMODULE_KEYTYPE_EMPTY is returned. */
RM_KeyType(RedisModuleKey * key)1558 int RM_KeyType(RedisModuleKey *key) {
1559 if (key == NULL || key->value == NULL) return REDISMODULE_KEYTYPE_EMPTY;
1560 /* We map between defines so that we are free to change the internal
1561 * defines as desired. */
1562 switch(key->value->type) {
1563 case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING;
1564 case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST;
1565 case OBJ_SET: return REDISMODULE_KEYTYPE_SET;
1566 case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
1567 case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
1568 case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
1569 default: return 0;
1570 }
1571 }
1572
1573 /* Return the length of the value associated with the key.
1574 * For strings this is the length of the string. For all the other types
1575 * is the number of elements (just counting keys for hashes).
1576 *
1577 * If the key pointer is NULL or the key is empty, zero is returned. */
RM_ValueLength(RedisModuleKey * key)1578 size_t RM_ValueLength(RedisModuleKey *key) {
1579 if (key == NULL || key->value == NULL) return 0;
1580 switch(key->value->type) {
1581 case OBJ_STRING: return stringObjectLen(key->value);
1582 case OBJ_LIST: return listTypeLength(key->value);
1583 case OBJ_SET: return setTypeSize(key->value);
1584 case OBJ_ZSET: return zsetLength(key->value);
1585 case OBJ_HASH: return hashTypeLength(key->value);
1586 default: return 0;
1587 }
1588 }
1589
1590 /* If the key is open for writing, remove it, and setup the key to
1591 * accept new writes as an empty key (that will be created on demand).
1592 * On success REDISMODULE_OK is returned. If the key is not open for
1593 * writing REDISMODULE_ERR is returned. */
RM_DeleteKey(RedisModuleKey * key)1594 int RM_DeleteKey(RedisModuleKey *key) {
1595 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1596 if (key->value) {
1597 dbDelete(key->db,key->key);
1598 key->value = NULL;
1599 }
1600 return REDISMODULE_OK;
1601 }
1602
1603 /* If the key is open for writing, unlink it (that is delete it in a
1604 * non-blocking way, not reclaiming memory immediately) and setup the key to
1605 * accept new writes as an empty key (that will be created on demand).
1606 * On success REDISMODULE_OK is returned. If the key is not open for
1607 * writing REDISMODULE_ERR is returned. */
RM_UnlinkKey(RedisModuleKey * key)1608 int RM_UnlinkKey(RedisModuleKey *key) {
1609 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1610 if (key->value) {
1611 dbAsyncDelete(key->db,key->key);
1612 key->value = NULL;
1613 }
1614 return REDISMODULE_OK;
1615 }
1616
1617 /* Return the key expire value, as milliseconds of remaining TTL.
1618 * If no TTL is associated with the key or if the key is empty,
1619 * REDISMODULE_NO_EXPIRE is returned. */
RM_GetExpire(RedisModuleKey * key)1620 mstime_t RM_GetExpire(RedisModuleKey *key) {
1621 mstime_t expire = getExpire(key->db,key->key);
1622 if (expire == -1 || key->value == NULL) return -1;
1623 expire -= mstime();
1624 return expire >= 0 ? expire : 0;
1625 }
1626
1627 /* Set a new expire for the key. If the special expire
1628 * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was
1629 * one (the same as the PERSIST command).
1630 *
1631 * Note that the expire must be provided as a positive integer representing
1632 * the number of milliseconds of TTL the key should have.
1633 *
1634 * The function returns REDISMODULE_OK on success or REDISMODULE_ERR if
1635 * the key was not open for writing or is an empty key. */
RM_SetExpire(RedisModuleKey * key,mstime_t expire)1636 int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
1637 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL)
1638 return REDISMODULE_ERR;
1639 if (expire != REDISMODULE_NO_EXPIRE) {
1640 expire += mstime();
1641 setExpire(key->ctx->client,key->db,key->key,expire);
1642 } else {
1643 removeExpire(key->db,key->key);
1644 }
1645 return REDISMODULE_OK;
1646 }
1647
1648 /* --------------------------------------------------------------------------
1649 * Key API for String type
1650 * -------------------------------------------------------------------------- */
1651
1652 /* If the key is open for writing, set the specified string 'str' as the
1653 * value of the key, deleting the old value if any.
1654 * On success REDISMODULE_OK is returned. If the key is not open for
1655 * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_StringSet(RedisModuleKey * key,RedisModuleString * str)1656 int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
1657 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
1658 RM_DeleteKey(key);
1659 setKey(key->db,key->key,str);
1660 key->value = str;
1661 return REDISMODULE_OK;
1662 }
1663
1664 /* Prepare the key associated string value for DMA access, and returns
1665 * a pointer and size (by reference), that the user can use to read or
1666 * modify the string in-place accessing it directly via pointer.
1667 *
1668 * The 'mode' is composed by bitwise OR-ing the following flags:
1669 *
1670 * REDISMODULE_READ -- Read access
1671 * REDISMODULE_WRITE -- Write access
1672 *
1673 * If the DMA is not requested for writing, the pointer returned should
1674 * only be accessed in a read-only fashion.
1675 *
1676 * On error (wrong type) NULL is returned.
1677 *
1678 * DMA access rules:
1679 *
1680 * 1. No other key writing function should be called since the moment
1681 * the pointer is obtained, for all the time we want to use DMA access
1682 * to read or modify the string.
1683 *
1684 * 2. Each time RM_StringTruncate() is called, to continue with the DMA
1685 * access, RM_StringDMA() should be called again to re-obtain
1686 * a new pointer and length.
1687 *
1688 * 3. If the returned pointer is not NULL, but the length is zero, no
1689 * byte can be touched (the string is empty, or the key itself is empty)
1690 * so a RM_StringTruncate() call should be used if there is to enlarge
1691 * the string, and later call StringDMA() again to get the pointer.
1692 */
RM_StringDMA(RedisModuleKey * key,size_t * len,int mode)1693 char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
1694 /* We need to return *some* pointer for empty keys, we just return
1695 * a string literal pointer, that is the advantage to be mapped into
1696 * a read only memory page, so the module will segfault if a write
1697 * attempt is performed. */
1698 char *emptystring = "<dma-empty-string>";
1699 if (key->value == NULL) {
1700 *len = 0;
1701 return emptystring;
1702 }
1703
1704 if (key->value->type != OBJ_STRING) return NULL;
1705
1706 /* For write access, and even for read access if the object is encoded,
1707 * we unshare the string (that has the side effect of decoding it). */
1708 if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
1709 key->value = dbUnshareStringValue(key->db, key->key, key->value);
1710
1711 *len = sdslen(key->value->ptr);
1712 return key->value->ptr;
1713 }
1714
1715 /* If the string is open for writing and is of string type, resize it, padding
1716 * with zero bytes if the new length is greater than the old one.
1717 *
1718 * After this call, RM_StringDMA() must be called again to continue
1719 * DMA access with the new pointer.
1720 *
1721 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on
1722 * error, that is, the key is not open for writing, is not a string
1723 * or resizing for more than 512 MB is requested.
1724 *
1725 * If the key is empty, a string key is created with the new string value
1726 * unless the new length value requested is zero. */
RM_StringTruncate(RedisModuleKey * key,size_t newlen)1727 int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
1728 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1729 if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR;
1730 if (newlen > 512*1024*1024) return REDISMODULE_ERR;
1731
1732 /* Empty key and new len set to 0. Just return REDISMODULE_OK without
1733 * doing anything. */
1734 if (key->value == NULL && newlen == 0) return REDISMODULE_OK;
1735
1736 if (key->value == NULL) {
1737 /* Empty key: create it with the new size. */
1738 robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
1739 setKey(key->db,key->key,o);
1740 key->value = o;
1741 decrRefCount(o);
1742 } else {
1743 /* Unshare and resize. */
1744 key->value = dbUnshareStringValue(key->db, key->key, key->value);
1745 size_t curlen = sdslen(key->value->ptr);
1746 if (newlen > curlen) {
1747 key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
1748 } else if (newlen < curlen) {
1749 sdsrange(key->value->ptr,0,newlen-1);
1750 /* If the string is too wasteful, reallocate it. */
1751 if (sdslen(key->value->ptr) < sdsavail(key->value->ptr))
1752 key->value->ptr = sdsRemoveFreeSpace(key->value->ptr);
1753 }
1754 }
1755 return REDISMODULE_OK;
1756 }
1757
1758 /* --------------------------------------------------------------------------
1759 * Key API for List type
1760 * -------------------------------------------------------------------------- */
1761
1762 /* Push an element into a list, on head or tail depending on 'where' argument.
1763 * If the key pointer is about an empty key opened for writing, the key
1764 * is created. On error (key opened for read-only operations or of the wrong
1765 * type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */
RM_ListPush(RedisModuleKey * key,int where,RedisModuleString * ele)1766 int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
1767 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1768 if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR;
1769 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST);
1770 listTypePush(key->value, ele,
1771 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1772 return REDISMODULE_OK;
1773 }
1774
1775 /* Pop an element from the list, and returns it as a module string object
1776 * that the user should be free with RM_FreeString() or by enabling
1777 * automatic memory. 'where' specifies if the element should be popped from
1778 * head or tail. The command returns NULL if:
1779 * 1) The list is empty.
1780 * 2) The key was not open for writing.
1781 * 3) The key is not a list. */
RM_ListPop(RedisModuleKey * key,int where)1782 RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
1783 if (!(key->mode & REDISMODULE_WRITE) ||
1784 key->value == NULL ||
1785 key->value->type != OBJ_LIST) return NULL;
1786 robj *ele = listTypePop(key->value,
1787 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1788 robj *decoded = getDecodedObject(ele);
1789 decrRefCount(ele);
1790 moduleDelKeyIfEmpty(key);
1791 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded);
1792 return decoded;
1793 }
1794
1795 /* --------------------------------------------------------------------------
1796 * Key API for Sorted Set type
1797 * -------------------------------------------------------------------------- */
1798
1799 /* Conversion from/to public flags of the Modules API and our private flags,
1800 * so that we have everything decoupled. */
RM_ZsetAddFlagsToCoreFlags(int flags)1801 int RM_ZsetAddFlagsToCoreFlags(int flags) {
1802 int retflags = 0;
1803 if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX;
1804 if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX;
1805 return retflags;
1806 }
1807
1808 /* See previous function comment. */
RM_ZsetAddFlagsFromCoreFlags(int flags)1809 int RM_ZsetAddFlagsFromCoreFlags(int flags) {
1810 int retflags = 0;
1811 if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED;
1812 if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED;
1813 if (flags & ZADD_NOP) retflags |= REDISMODULE_ZADD_NOP;
1814 return retflags;
1815 }
1816
1817 /* Add a new element into a sorted set, with the specified 'score'.
1818 * If the element already exists, the score is updated.
1819 *
1820 * A new sorted set is created at value if the key is an empty open key
1821 * setup for writing.
1822 *
1823 * Additional flags can be passed to the function via a pointer, the flags
1824 * are both used to receive input and to communicate state when the function
1825 * returns. 'flagsptr' can be NULL if no special flags are used.
1826 *
1827 * The input flags are:
1828 *
1829 * REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise.
1830 * REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise.
1831 *
1832 * The output flags are:
1833 *
1834 * REDISMODULE_ZADD_ADDED: The new element was added to the sorted set.
1835 * REDISMODULE_ZADD_UPDATED: The score of the element was updated.
1836 * REDISMODULE_ZADD_NOP: No operation was performed because XX or NX flags.
1837 *
1838 * On success the function returns REDISMODULE_OK. On the following errors
1839 * REDISMODULE_ERR is returned:
1840 *
1841 * * The key was not opened for writing.
1842 * * The key is of the wrong type.
1843 * * 'score' double value is not a number (NaN).
1844 */
RM_ZsetAdd(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr)1845 int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr) {
1846 int flags = 0;
1847 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1848 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1849 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
1850 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
1851 if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) {
1852 if (flagsptr) *flagsptr = 0;
1853 return REDISMODULE_ERR;
1854 }
1855 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
1856 return REDISMODULE_OK;
1857 }
1858
1859 /* This function works exactly like RM_ZsetAdd(), but instead of setting
1860 * a new score, the score of the existing element is incremented, or if the
1861 * element does not already exist, it is added assuming the old score was
1862 * zero.
1863 *
1864 * The input and output flags, and the return value, have the same exact
1865 * meaning, with the only difference that this function will return
1866 * REDISMODULE_ERR even when 'score' is a valid double number, but adding it
1867 * to the existing score results into a NaN (not a number) condition.
1868 *
1869 * This function has an additional field 'newscore', if not NULL is filled
1870 * with the new score of the element after the increment, if no error
1871 * is returned. */
RM_ZsetIncrby(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr,double * newscore)1872 int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore) {
1873 int flags = 0;
1874 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1875 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1876 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
1877 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
1878 flags |= ZADD_INCR;
1879 if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) {
1880 if (flagsptr) *flagsptr = 0;
1881 return REDISMODULE_ERR;
1882 }
1883 /* zsetAdd() may signal back that the resulting score is not a number. */
1884 if (flagsptr && (*flagsptr & ZADD_NAN)) {
1885 *flagsptr = 0;
1886 return REDISMODULE_ERR;
1887 }
1888 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
1889 return REDISMODULE_OK;
1890 }
1891
1892 /* Remove the specified element from the sorted set.
1893 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR
1894 * on one of the following conditions:
1895 *
1896 * * The key was not opened for writing.
1897 * * The key is of the wrong type.
1898 *
1899 * The return value does NOT indicate the fact the element was really
1900 * removed (since it existed) or not, just if the function was executed
1901 * with success.
1902 *
1903 * In order to know if the element was removed, the additional argument
1904 * 'deleted' must be passed, that populates the integer by reference
1905 * setting it to 1 or 0 depending on the outcome of the operation.
1906 * The 'deleted' argument can be NULL if the caller is not interested
1907 * to know if the element was really removed.
1908 *
1909 * Empty keys will be handled correctly by doing nothing. */
RM_ZsetRem(RedisModuleKey * key,RedisModuleString * ele,int * deleted)1910 int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) {
1911 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1912 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1913 if (key->value != NULL && zsetDel(key->value,ele->ptr)) {
1914 if (deleted) *deleted = 1;
1915 } else {
1916 if (deleted) *deleted = 0;
1917 }
1918 return REDISMODULE_OK;
1919 }
1920
1921 /* On success retrieve the double score associated at the sorted set element
1922 * 'ele' and returns REDISMODULE_OK. Otherwise REDISMODULE_ERR is returned
1923 * to signal one of the following conditions:
1924 *
1925 * * There is no such element 'ele' in the sorted set.
1926 * * The key is not a sorted set.
1927 * * The key is an open empty key.
1928 */
RM_ZsetScore(RedisModuleKey * key,RedisModuleString * ele,double * score)1929 int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
1930 if (key->value == NULL) return REDISMODULE_ERR;
1931 if (key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1932 if (zsetScore(key->value,ele->ptr,score) == C_ERR) return REDISMODULE_ERR;
1933 return REDISMODULE_OK;
1934 }
1935
1936 /* --------------------------------------------------------------------------
1937 * Key API for Sorted Set iterator
1938 * -------------------------------------------------------------------------- */
1939
zsetKeyReset(RedisModuleKey * key)1940 void zsetKeyReset(RedisModuleKey *key) {
1941 key->ztype = REDISMODULE_ZSET_RANGE_NONE;
1942 key->zcurrent = NULL;
1943 key->zer = 1;
1944 }
1945
1946 /* Stop a sorted set iteration. */
RM_ZsetRangeStop(RedisModuleKey * key)1947 void RM_ZsetRangeStop(RedisModuleKey *key) {
1948 /* Free resources if needed. */
1949 if (key->ztype == REDISMODULE_ZSET_RANGE_LEX)
1950 zslFreeLexRange(&key->zlrs);
1951 /* Setup sensible values so that misused iteration API calls when an
1952 * iterator is not active will result into something more sensible
1953 * than crashing. */
1954 zsetKeyReset(key);
1955 }
1956
1957 /* Return the "End of range" flag value to signal the end of the iteration. */
RM_ZsetRangeEndReached(RedisModuleKey * key)1958 int RM_ZsetRangeEndReached(RedisModuleKey *key) {
1959 return key->zer;
1960 }
1961
1962 /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange().
1963 * Setup the sorted set iteration according to the specified score range
1964 * (see the functions calling it for more info). If 'first' is true the
1965 * first element in the range is used as a starting point for the iterator
1966 * otherwise the last. Return REDISMODULE_OK on success otherwise
1967 * REDISMODULE_ERR. */
zsetInitScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex,int first)1968 int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex, int first) {
1969 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1970
1971 RM_ZsetRangeStop(key);
1972 key->ztype = REDISMODULE_ZSET_RANGE_SCORE;
1973 key->zer = 0;
1974
1975 /* Setup the range structure used by the sorted set core implementation
1976 * in order to seek at the specified element. */
1977 zrangespec *zrs = &key->zrs;
1978 zrs->min = min;
1979 zrs->max = max;
1980 zrs->minex = minex;
1981 zrs->maxex = maxex;
1982
1983 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
1984 key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) :
1985 zzlLastInRange(key->value->ptr,zrs);
1986 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
1987 zset *zs = key->value->ptr;
1988 zskiplist *zsl = zs->zsl;
1989 key->zcurrent = first ? zslFirstInRange(zsl,zrs) :
1990 zslLastInRange(zsl,zrs);
1991 } else {
1992 serverPanic("Unsupported zset encoding");
1993 }
1994 if (key->zcurrent == NULL) key->zer = 1;
1995 return REDISMODULE_OK;
1996 }
1997
1998 /* Setup a sorted set iterator seeking the first element in the specified
1999 * range. Returns REDISMODULE_OK if the iterator was correctly initialized
2000 * otherwise REDISMODULE_ERR is returned in the following conditions:
2001 *
2002 * 1. The value stored at key is not a sorted set or the key is empty.
2003 *
2004 * The range is specified according to the two double values 'min' and 'max'.
2005 * Both can be infinite using the following two macros:
2006 *
2007 * REDISMODULE_POSITIVE_INFINITE for positive infinite value
2008 * REDISMODULE_NEGATIVE_INFINITE for negative infinite value
2009 *
2010 * 'minex' and 'maxex' parameters, if true, respectively setup a range
2011 * where the min and max value are exclusive (not included) instead of
2012 * inclusive. */
RM_ZsetFirstInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2013 int RM_ZsetFirstInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2014 return zsetInitScoreRange(key,min,max,minex,maxex,1);
2015 }
2016
2017 /* Exactly like RedisModule_ZsetFirstInScoreRange() but the last element of
2018 * the range is selected for the start of the iteration instead. */
RM_ZsetLastInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2019 int RM_ZsetLastInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2020 return zsetInitScoreRange(key,min,max,minex,maxex,0);
2021 }
2022
2023 /* Helper function for RM_ZsetFirstInLexRange() and RM_ZsetLastInLexRange().
2024 * Setup the sorted set iteration according to the specified lexicographical
2025 * range (see the functions calling it for more info). If 'first' is true the
2026 * first element in the range is used as a starting point for the iterator
2027 * otherwise the last. Return REDISMODULE_OK on success otherwise
2028 * REDISMODULE_ERR.
2029 *
2030 * Note that this function takes 'min' and 'max' in the same form of the
2031 * Redis ZRANGEBYLEX command. */
zsetInitLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max,int first)2032 int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max, int first) {
2033 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2034
2035 RM_ZsetRangeStop(key);
2036 key->zer = 0;
2037
2038 /* Setup the range structure used by the sorted set core implementation
2039 * in order to seek at the specified element. */
2040 zlexrangespec *zlrs = &key->zlrs;
2041 if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR;
2042
2043 /* Set the range type to lex only after successfully parsing the range,
2044 * otherwise we don't want the zlexrangespec to be freed. */
2045 key->ztype = REDISMODULE_ZSET_RANGE_LEX;
2046
2047 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2048 key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
2049 zzlLastInLexRange(key->value->ptr,zlrs);
2050 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2051 zset *zs = key->value->ptr;
2052 zskiplist *zsl = zs->zsl;
2053 key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) :
2054 zslLastInLexRange(zsl,zlrs);
2055 } else {
2056 serverPanic("Unsupported zset encoding");
2057 }
2058 if (key->zcurrent == NULL) key->zer = 1;
2059
2060 return REDISMODULE_OK;
2061 }
2062
2063 /* Setup a sorted set iterator seeking the first element in the specified
2064 * lexicographical range. Returns REDISMODULE_OK if the iterator was correctly
2065 * initialized otherwise REDISMODULE_ERR is returned in the
2066 * following conditions:
2067 *
2068 * 1. The value stored at key is not a sorted set or the key is empty.
2069 * 2. The lexicographical range 'min' and 'max' format is invalid.
2070 *
2071 * 'min' and 'max' should be provided as two RedisModuleString objects
2072 * in the same format as the parameters passed to the ZRANGEBYLEX command.
2073 * The function does not take ownership of the objects, so they can be released
2074 * ASAP after the iterator is setup. */
RM_ZsetFirstInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2075 int RM_ZsetFirstInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2076 return zsetInitLexRange(key,min,max,1);
2077 }
2078
2079 /* Exactly like RedisModule_ZsetFirstInLexRange() but the last element of
2080 * the range is selected for the start of the iteration instead. */
RM_ZsetLastInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2081 int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2082 return zsetInitLexRange(key,min,max,0);
2083 }
2084
2085 /* Return the current sorted set element of an active sorted set iterator
2086 * or NULL if the range specified in the iterator does not include any
2087 * element. */
RM_ZsetRangeCurrentElement(RedisModuleKey * key,double * score)2088 RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
2089 RedisModuleString *str;
2090
2091 if (key->zcurrent == NULL) return NULL;
2092 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2093 unsigned char *eptr, *sptr;
2094 eptr = key->zcurrent;
2095 sds ele = ziplistGetObject(eptr);
2096 if (score) {
2097 sptr = ziplistNext(key->value->ptr,eptr);
2098 *score = zzlGetScore(sptr);
2099 }
2100 str = createObject(OBJ_STRING,ele);
2101 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2102 zskiplistNode *ln = key->zcurrent;
2103 if (score) *score = ln->score;
2104 str = createStringObject(ln->ele,sdslen(ln->ele));
2105 } else {
2106 serverPanic("Unsupported zset encoding");
2107 }
2108 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,str);
2109 return str;
2110 }
2111
2112 /* Go to the next element of the sorted set iterator. Returns 1 if there was
2113 * a next element, 0 if we are already at the latest element or the range
2114 * does not include any item at all. */
RM_ZsetRangeNext(RedisModuleKey * key)2115 int RM_ZsetRangeNext(RedisModuleKey *key) {
2116 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2117
2118 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2119 unsigned char *zl = key->value->ptr;
2120 unsigned char *eptr = key->zcurrent;
2121 unsigned char *next;
2122 next = ziplistNext(zl,eptr); /* Skip element. */
2123 if (next) next = ziplistNext(zl,next); /* Skip score. */
2124 if (next == NULL) {
2125 key->zer = 1;
2126 return 0;
2127 } else {
2128 /* Are we still within the range? */
2129 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2130 /* Fetch the next element score for the
2131 * range check. */
2132 unsigned char *saved_next = next;
2133 next = ziplistNext(zl,next); /* Skip next element. */
2134 double score = zzlGetScore(next); /* Obtain the next score. */
2135 if (!zslValueLteMax(score,&key->zrs)) {
2136 key->zer = 1;
2137 return 0;
2138 }
2139 next = saved_next;
2140 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2141 if (!zzlLexValueLteMax(next,&key->zlrs)) {
2142 key->zer = 1;
2143 return 0;
2144 }
2145 }
2146 key->zcurrent = next;
2147 return 1;
2148 }
2149 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2150 zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
2151 if (next == NULL) {
2152 key->zer = 1;
2153 return 0;
2154 } else {
2155 /* Are we still within the range? */
2156 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2157 !zslValueLteMax(next->score,&key->zrs))
2158 {
2159 key->zer = 1;
2160 return 0;
2161 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2162 if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
2163 key->zer = 1;
2164 return 0;
2165 }
2166 }
2167 key->zcurrent = next;
2168 return 1;
2169 }
2170 } else {
2171 serverPanic("Unsupported zset encoding");
2172 }
2173 }
2174
2175 /* Go to the previous element of the sorted set iterator. Returns 1 if there was
2176 * a previous element, 0 if we are already at the first element or the range
2177 * does not include any item at all. */
RM_ZsetRangePrev(RedisModuleKey * key)2178 int RM_ZsetRangePrev(RedisModuleKey *key) {
2179 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2180
2181 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2182 unsigned char *zl = key->value->ptr;
2183 unsigned char *eptr = key->zcurrent;
2184 unsigned char *prev;
2185 prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
2186 if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
2187 if (prev == NULL) {
2188 key->zer = 1;
2189 return 0;
2190 } else {
2191 /* Are we still within the range? */
2192 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2193 /* Fetch the previous element score for the
2194 * range check. */
2195 unsigned char *saved_prev = prev;
2196 prev = ziplistNext(zl,prev); /* Skip element to get the score.*/
2197 double score = zzlGetScore(prev); /* Obtain the prev score. */
2198 if (!zslValueGteMin(score,&key->zrs)) {
2199 key->zer = 1;
2200 return 0;
2201 }
2202 prev = saved_prev;
2203 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2204 if (!zzlLexValueGteMin(prev,&key->zlrs)) {
2205 key->zer = 1;
2206 return 0;
2207 }
2208 }
2209 key->zcurrent = prev;
2210 return 1;
2211 }
2212 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2213 zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
2214 if (prev == NULL) {
2215 key->zer = 1;
2216 return 0;
2217 } else {
2218 /* Are we still within the range? */
2219 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2220 !zslValueGteMin(prev->score,&key->zrs))
2221 {
2222 key->zer = 1;
2223 return 0;
2224 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2225 if (!zslLexValueGteMin(prev->ele,&key->zlrs)) {
2226 key->zer = 1;
2227 return 0;
2228 }
2229 }
2230 key->zcurrent = prev;
2231 return 1;
2232 }
2233 } else {
2234 serverPanic("Unsupported zset encoding");
2235 }
2236 }
2237
2238 /* --------------------------------------------------------------------------
2239 * Key API for Hash type
2240 * -------------------------------------------------------------------------- */
2241
2242 /* Set the field of the specified hash field to the specified value.
2243 * If the key is an empty key open for writing, it is created with an empty
2244 * hash value, in order to set the specified field.
2245 *
2246 * The function is variadic and the user must specify pairs of field
2247 * names and values, both as RedisModuleString pointers (unless the
2248 * CFIELD option is set, see later). At the end of the field/value-ptr pairs,
2249 * NULL must be specified as last argument to signal the end of the arguments
2250 * in the variadic function.
2251 *
2252 * Example to set the hash argv[1] to the value argv[2]:
2253 *
2254 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],argv[2],NULL);
2255 *
2256 * The function can also be used in order to delete fields (if they exist)
2257 * by setting them to the specified value of REDISMODULE_HASH_DELETE:
2258 *
2259 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],
2260 * REDISMODULE_HASH_DELETE,NULL);
2261 *
2262 * The behavior of the command changes with the specified flags, that can be
2263 * set to REDISMODULE_HASH_NONE if no special behavior is needed.
2264 *
2265 * REDISMODULE_HASH_NX: The operation is performed only if the field was not
2266 * already existing in the hash.
2267 * REDISMODULE_HASH_XX: The operation is performed only if the field was
2268 * already existing, so that a new value could be
2269 * associated to an existing filed, but no new fields
2270 * are created.
2271 * REDISMODULE_HASH_CFIELDS: The field names passed are null terminated C
2272 * strings instead of RedisModuleString objects.
2273 *
2274 * Unless NX is specified, the command overwrites the old field value with
2275 * the new one.
2276 *
2277 * When using REDISMODULE_HASH_CFIELDS, field names are reported using
2278 * normal C strings, so for example to delete the field "foo" the following
2279 * code can be used:
2280 *
2281 * RedisModule_HashSet(key,REDISMODULE_HASH_CFIELDS,"foo",
2282 * REDISMODULE_HASH_DELETE,NULL);
2283 *
2284 * Return value:
2285 *
2286 * The number of fields updated (that may be less than the number of fields
2287 * specified because of the XX or NX options).
2288 *
2289 * In the following case the return value is always zero:
2290 *
2291 * * The key was not open for writing.
2292 * * The key was associated with a non Hash value.
2293 */
RM_HashSet(RedisModuleKey * key,int flags,...)2294 int RM_HashSet(RedisModuleKey *key, int flags, ...) {
2295 va_list ap;
2296 if (!(key->mode & REDISMODULE_WRITE)) return 0;
2297 if (key->value && key->value->type != OBJ_HASH) return 0;
2298 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_HASH);
2299
2300 int updated = 0;
2301 va_start(ap, flags);
2302 while(1) {
2303 RedisModuleString *field, *value;
2304 /* Get the field and value objects. */
2305 if (flags & REDISMODULE_HASH_CFIELDS) {
2306 char *cfield = va_arg(ap,char*);
2307 if (cfield == NULL) break;
2308 field = createRawStringObject(cfield,strlen(cfield));
2309 } else {
2310 field = va_arg(ap,RedisModuleString*);
2311 if (field == NULL) break;
2312 }
2313 value = va_arg(ap,RedisModuleString*);
2314
2315 /* Handle XX and NX */
2316 if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
2317 int exists = hashTypeExists(key->value, field->ptr);
2318 if (((flags & REDISMODULE_HASH_XX) && !exists) ||
2319 ((flags & REDISMODULE_HASH_NX) && exists))
2320 {
2321 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2322 continue;
2323 }
2324 }
2325
2326 /* Handle deletion if value is REDISMODULE_HASH_DELETE. */
2327 if (value == REDISMODULE_HASH_DELETE) {
2328 updated += hashTypeDelete(key->value, field->ptr);
2329 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2330 continue;
2331 }
2332
2333 int low_flags = HASH_SET_COPY;
2334 /* If CFIELDS is active, we can pass the ownership of the
2335 * SDS object to the low level function that sets the field
2336 * to avoid a useless copy. */
2337 if (flags & REDISMODULE_HASH_CFIELDS)
2338 low_flags |= HASH_SET_TAKE_FIELD;
2339
2340 robj *argv[2] = {field,value};
2341 hashTypeTryConversion(key->value,argv,0,1);
2342 updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
2343
2344 /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
2345 * however we still have to release the 'field' object shell. */
2346 if (flags & REDISMODULE_HASH_CFIELDS) {
2347 field->ptr = NULL; /* Prevent the SDS string from being freed. */
2348 decrRefCount(field);
2349 }
2350 }
2351 va_end(ap);
2352 moduleDelKeyIfEmpty(key);
2353 return updated;
2354 }
2355
2356 /* Get fields from an hash value. This function is called using a variable
2357 * number of arguments, alternating a field name (as a StringRedisModule
2358 * pointer) with a pointer to a StringRedisModule pointer, that is set to the
2359 * value of the field if the field exist, or NULL if the field did not exist.
2360 * At the end of the field/value-ptr pairs, NULL must be specified as last
2361 * argument to signal the end of the arguments in the variadic function.
2362 *
2363 * This is an example usage:
2364 *
2365 * RedisModuleString *first, *second;
2366 * RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first,
2367 * argv[2],&second,NULL);
2368 *
2369 * As with RedisModule_HashSet() the behavior of the command can be specified
2370 * passing flags different than REDISMODULE_HASH_NONE:
2371 *
2372 * REDISMODULE_HASH_CFIELD: field names as null terminated C strings.
2373 *
2374 * REDISMODULE_HASH_EXISTS: instead of setting the value of the field
2375 * expecting a RedisModuleString pointer to pointer, the function just
2376 * reports if the field esists or not and expects an integer pointer
2377 * as the second element of each pair.
2378 *
2379 * Example of REDISMODULE_HASH_CFIELD:
2380 *
2381 * RedisModuleString *username, *hashedpass;
2382 * RedisModule_HashGet(mykey,"username",&username,"hp",&hashedpass, NULL);
2383 *
2384 * Example of REDISMODULE_HASH_EXISTS:
2385 *
2386 * int exists;
2387 * RedisModule_HashGet(mykey,argv[1],&exists,NULL);
2388 *
2389 * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if
2390 * the key is not an hash value.
2391 *
2392 * Memory management:
2393 *
2394 * The returned RedisModuleString objects should be released with
2395 * RedisModule_FreeString(), or by enabling automatic memory management.
2396 */
RM_HashGet(RedisModuleKey * key,int flags,...)2397 int RM_HashGet(RedisModuleKey *key, int flags, ...) {
2398 va_list ap;
2399 if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR;
2400
2401 va_start(ap, flags);
2402 while(1) {
2403 RedisModuleString *field, **valueptr;
2404 int *existsptr;
2405 /* Get the field object and the value pointer to pointer. */
2406 if (flags & REDISMODULE_HASH_CFIELDS) {
2407 char *cfield = va_arg(ap,char*);
2408 if (cfield == NULL) break;
2409 field = createRawStringObject(cfield,strlen(cfield));
2410 } else {
2411 field = va_arg(ap,RedisModuleString*);
2412 if (field == NULL) break;
2413 }
2414
2415 /* Query the hash for existence or value object. */
2416 if (flags & REDISMODULE_HASH_EXISTS) {
2417 existsptr = va_arg(ap,int*);
2418 if (key->value)
2419 *existsptr = hashTypeExists(key->value,field->ptr);
2420 else
2421 *existsptr = 0;
2422 } else {
2423 valueptr = va_arg(ap,RedisModuleString**);
2424 if (key->value) {
2425 *valueptr = hashTypeGetValueObject(key->value,field->ptr);
2426 if (*valueptr) {
2427 robj *decoded = getDecodedObject(*valueptr);
2428 decrRefCount(*valueptr);
2429 *valueptr = decoded;
2430 }
2431 if (*valueptr)
2432 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,*valueptr);
2433 } else {
2434 *valueptr = NULL;
2435 }
2436 }
2437
2438 /* Cleanup */
2439 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2440 }
2441 va_end(ap);
2442 return REDISMODULE_OK;
2443 }
2444
2445 /* --------------------------------------------------------------------------
2446 * Redis <-> Modules generic Call() API
2447 * -------------------------------------------------------------------------- */
2448
2449 /* Create a new RedisModuleCallReply object. The processing of the reply
2450 * is lazy, the object is just populated with the raw protocol and later
2451 * is processed as needed. Initially we just make sure to set the right
2452 * reply type, which is extremely cheap to do. */
moduleCreateCallReplyFromProto(RedisModuleCtx * ctx,sds proto)2453 RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) {
2454 RedisModuleCallReply *reply = zmalloc(sizeof(*reply));
2455 reply->ctx = ctx;
2456 reply->proto = proto;
2457 reply->protolen = sdslen(proto);
2458 reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */
2459 switch(proto[0]) {
2460 case '$':
2461 case '+': reply->type = REDISMODULE_REPLY_STRING; break;
2462 case '-': reply->type = REDISMODULE_REPLY_ERROR; break;
2463 case ':': reply->type = REDISMODULE_REPLY_INTEGER; break;
2464 case '*': reply->type = REDISMODULE_REPLY_ARRAY; break;
2465 default: reply->type = REDISMODULE_REPLY_UNKNOWN; break;
2466 }
2467 if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-')
2468 reply->type = REDISMODULE_REPLY_NULL;
2469 return reply;
2470 }
2471
2472 void moduleParseCallReply_Int(RedisModuleCallReply *reply);
2473 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply);
2474 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply);
2475 void moduleParseCallReply_Array(RedisModuleCallReply *reply);
2476
2477 /* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise
2478 * use the protcol of the reply in reply->proto in order to fill the
2479 * reply with parsed data according to the reply type. */
moduleParseCallReply(RedisModuleCallReply * reply)2480 void moduleParseCallReply(RedisModuleCallReply *reply) {
2481 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return;
2482 reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE;
2483
2484 switch(reply->proto[0]) {
2485 case ':': moduleParseCallReply_Int(reply); break;
2486 case '$': moduleParseCallReply_BulkString(reply); break;
2487 case '-': /* handled by next item. */
2488 case '+': moduleParseCallReply_SimpleString(reply); break;
2489 case '*': moduleParseCallReply_Array(reply); break;
2490 }
2491 }
2492
moduleParseCallReply_Int(RedisModuleCallReply * reply)2493 void moduleParseCallReply_Int(RedisModuleCallReply *reply) {
2494 char *proto = reply->proto;
2495 char *p = strchr(proto+1,'\r');
2496
2497 string2ll(proto+1,p-proto-1,&reply->val.ll);
2498 reply->protolen = p-proto+2;
2499 reply->type = REDISMODULE_REPLY_INTEGER;
2500 }
2501
moduleParseCallReply_BulkString(RedisModuleCallReply * reply)2502 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) {
2503 char *proto = reply->proto;
2504 char *p = strchr(proto+1,'\r');
2505 long long bulklen;
2506
2507 string2ll(proto+1,p-proto-1,&bulklen);
2508 if (bulklen == -1) {
2509 reply->protolen = p-proto+2;
2510 reply->type = REDISMODULE_REPLY_NULL;
2511 } else {
2512 reply->val.str = p+2;
2513 reply->len = bulklen;
2514 reply->protolen = p-proto+2+bulklen+2;
2515 reply->type = REDISMODULE_REPLY_STRING;
2516 }
2517 }
2518
moduleParseCallReply_SimpleString(RedisModuleCallReply * reply)2519 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) {
2520 char *proto = reply->proto;
2521 char *p = strchr(proto+1,'\r');
2522
2523 reply->val.str = proto+1;
2524 reply->len = p-proto-1;
2525 reply->protolen = p-proto+2;
2526 reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING :
2527 REDISMODULE_REPLY_ERROR;
2528 }
2529
moduleParseCallReply_Array(RedisModuleCallReply * reply)2530 void moduleParseCallReply_Array(RedisModuleCallReply *reply) {
2531 char *proto = reply->proto;
2532 char *p = strchr(proto+1,'\r');
2533 long long arraylen, j;
2534
2535 string2ll(proto+1,p-proto-1,&arraylen);
2536 p += 2;
2537
2538 if (arraylen == -1) {
2539 reply->protolen = p-proto;
2540 reply->type = REDISMODULE_REPLY_NULL;
2541 return;
2542 }
2543
2544 reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen);
2545 reply->len = arraylen;
2546 for (j = 0; j < arraylen; j++) {
2547 RedisModuleCallReply *ele = reply->val.array+j;
2548 ele->flags = REDISMODULE_REPLYFLAG_NESTED |
2549 REDISMODULE_REPLYFLAG_TOPARSE;
2550 ele->proto = p;
2551 ele->ctx = reply->ctx;
2552 moduleParseCallReply(ele);
2553 p += ele->protolen;
2554 }
2555 reply->protolen = p-proto;
2556 reply->type = REDISMODULE_REPLY_ARRAY;
2557 }
2558
2559 /* Free a Call reply and all the nested replies it contains if it's an
2560 * array. */
RM_FreeCallReply_Rec(RedisModuleCallReply * reply,int freenested)2561 void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
2562 /* Don't free nested replies by default: the user must always free the
2563 * toplevel reply. However be gentle and don't crash if the module
2564 * misuses the API. */
2565 if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return;
2566
2567 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) {
2568 if (reply->type == REDISMODULE_REPLY_ARRAY) {
2569 size_t j;
2570 for (j = 0; j < reply->len; j++)
2571 RM_FreeCallReply_Rec(reply->val.array+j,1);
2572 zfree(reply->val.array);
2573 }
2574 }
2575
2576 /* For nested replies, we don't free reply->proto (which if not NULL
2577 * references the parent reply->proto buffer), nor the structure
2578 * itself which is allocated as an array of structures, and is freed
2579 * when the array value is released. */
2580 if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) {
2581 if (reply->proto) sdsfree(reply->proto);
2582 zfree(reply);
2583 }
2584 }
2585
2586 /* Wrapper for the recursive free reply function. This is needed in order
2587 * to have the first level function to return on nested replies, but only
2588 * if called by the module API. */
RM_FreeCallReply(RedisModuleCallReply * reply)2589 void RM_FreeCallReply(RedisModuleCallReply *reply) {
2590
2591 RedisModuleCtx *ctx = reply->ctx;
2592 RM_FreeCallReply_Rec(reply,0);
2593 autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
2594 }
2595
2596 /* Return the reply type. */
RM_CallReplyType(RedisModuleCallReply * reply)2597 int RM_CallReplyType(RedisModuleCallReply *reply) {
2598 if (!reply) return REDISMODULE_REPLY_UNKNOWN;
2599 return reply->type;
2600 }
2601
2602 /* Return the reply type length, where applicable. */
RM_CallReplyLength(RedisModuleCallReply * reply)2603 size_t RM_CallReplyLength(RedisModuleCallReply *reply) {
2604 moduleParseCallReply(reply);
2605 switch(reply->type) {
2606 case REDISMODULE_REPLY_STRING:
2607 case REDISMODULE_REPLY_ERROR:
2608 case REDISMODULE_REPLY_ARRAY:
2609 return reply->len;
2610 default:
2611 return 0;
2612 }
2613 }
2614
2615 /* Return the 'idx'-th nested call reply element of an array reply, or NULL
2616 * if the reply type is wrong or the index is out of range. */
RM_CallReplyArrayElement(RedisModuleCallReply * reply,size_t idx)2617 RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) {
2618 moduleParseCallReply(reply);
2619 if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL;
2620 if (idx >= reply->len) return NULL;
2621 return reply->val.array+idx;
2622 }
2623
2624 /* Return the long long of an integer reply. */
RM_CallReplyInteger(RedisModuleCallReply * reply)2625 long long RM_CallReplyInteger(RedisModuleCallReply *reply) {
2626 moduleParseCallReply(reply);
2627 if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN;
2628 return reply->val.ll;
2629 }
2630
2631 /* Return the pointer and length of a string or error reply. */
RM_CallReplyStringPtr(RedisModuleCallReply * reply,size_t * len)2632 const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
2633 moduleParseCallReply(reply);
2634 if (reply->type != REDISMODULE_REPLY_STRING &&
2635 reply->type != REDISMODULE_REPLY_ERROR) return NULL;
2636 if (len) *len = reply->len;
2637 return reply->val.str;
2638 }
2639
2640 /* Return a new string object from a call reply of type string, error or
2641 * integer. Otherwise (wrong reply type) return NULL. */
RM_CreateStringFromCallReply(RedisModuleCallReply * reply)2642 RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
2643 moduleParseCallReply(reply);
2644 switch(reply->type) {
2645 case REDISMODULE_REPLY_STRING:
2646 case REDISMODULE_REPLY_ERROR:
2647 return RM_CreateString(reply->ctx,reply->val.str,reply->len);
2648 case REDISMODULE_REPLY_INTEGER: {
2649 char buf[64];
2650 int len = ll2string(buf,sizeof(buf),reply->val.ll);
2651 return RM_CreateString(reply->ctx,buf,len);
2652 }
2653 default: return NULL;
2654 }
2655 }
2656
2657 /* Returns an array of robj pointers, and populates *argc with the number
2658 * of items, by parsing the format specifier "fmt" as described for
2659 * the RM_Call(), RM_Replicate() and other module APIs.
2660 *
2661 * The integer pointed by 'flags' is populated with flags according
2662 * to special modifiers in "fmt". For now only one exists:
2663 *
2664 * "!" -> REDISMODULE_ARGV_REPLICATE
2665 *
2666 * On error (format specifier error) NULL is returned and nothing is
2667 * allocated. On success the argument vector is returned. */
2668
2669 #define REDISMODULE_ARGV_REPLICATE (1<<0)
2670
moduleCreateArgvFromUserFormat(const char * cmdname,const char * fmt,int * argcp,int * flags,va_list ap)2671 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
2672 int argc = 0, argv_size, j;
2673 robj **argv = NULL;
2674
2675 /* As a first guess to avoid useless reallocations, size argv to
2676 * hold one argument for each char specifier in 'fmt'. */
2677 argv_size = strlen(fmt)+1; /* +1 because of the command name. */
2678 argv = zrealloc(argv,sizeof(robj*)*argv_size);
2679
2680 /* Build the arguments vector based on the format specifier. */
2681 argv[0] = createStringObject(cmdname,strlen(cmdname));
2682 argc++;
2683
2684 /* Create the client and dispatch the command. */
2685 const char *p = fmt;
2686 while(*p) {
2687 if (*p == 'c') {
2688 char *cstr = va_arg(ap,char*);
2689 argv[argc++] = createStringObject(cstr,strlen(cstr));
2690 } else if (*p == 's') {
2691 robj *obj = va_arg(ap,void*);
2692 argv[argc++] = obj;
2693 incrRefCount(obj);
2694 } else if (*p == 'b') {
2695 char *buf = va_arg(ap,char*);
2696 size_t len = va_arg(ap,size_t);
2697 argv[argc++] = createStringObject(buf,len);
2698 } else if (*p == 'l') {
2699 long ll = va_arg(ap,long long);
2700 argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll));
2701 } else if (*p == 'v') {
2702 /* A vector of strings */
2703 robj **v = va_arg(ap, void*);
2704 size_t vlen = va_arg(ap, size_t);
2705
2706 /* We need to grow argv to hold the vector's elements.
2707 * We resize by vector_len-1 elements, because we held
2708 * one element in argv for the vector already */
2709 argv_size += vlen-1;
2710 argv = zrealloc(argv,sizeof(robj*)*argv_size);
2711
2712 size_t i = 0;
2713 for (i = 0; i < vlen; i++) {
2714 incrRefCount(v[i]);
2715 argv[argc++] = v[i];
2716 }
2717 } else if (*p == '!') {
2718 if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
2719 } else {
2720 goto fmterr;
2721 }
2722 p++;
2723 }
2724 *argcp = argc;
2725 return argv;
2726
2727 fmterr:
2728 for (j = 0; j < argc; j++)
2729 decrRefCount(argv[j]);
2730 zfree(argv);
2731 return NULL;
2732 }
2733
2734 /* Exported API to call any Redis command from modules.
2735 * On success a RedisModuleCallReply object is returned, otherwise
2736 * NULL is returned and errno is set to the following values:
2737 *
2738 * EINVAL: command non existing, wrong arity, wrong format specifier.
2739 * EPERM: operation in Cluster instance with key in non local slot. */
RM_Call(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)2740 RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
2741 struct redisCommand *cmd;
2742 client *c = NULL;
2743 robj **argv = NULL;
2744 int argc = 0, flags = 0;
2745 va_list ap;
2746 RedisModuleCallReply *reply = NULL;
2747 int replicate = 0; /* Replicate this command? */
2748
2749 /* Create the client and dispatch the command. */
2750 va_start(ap, fmt);
2751 c = createClient(-1);
2752 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
2753 replicate = flags & REDISMODULE_ARGV_REPLICATE;
2754 va_end(ap);
2755
2756 /* Setup our fake client for command execution. */
2757 c->flags |= CLIENT_MODULE;
2758 c->db = ctx->client->db;
2759 c->argv = argv;
2760 c->argc = argc;
2761 if (ctx->module) ctx->module->in_call++;
2762
2763 /* We handle the above format error only when the client is setup so that
2764 * we can free it normally. */
2765 if (argv == NULL) goto cleanup;
2766
2767 /* Call command filters */
2768 moduleCallCommandFilters(c);
2769
2770 /* Lookup command now, after filters had a chance to make modifications
2771 * if necessary.
2772 */
2773 cmd = lookupCommand(c->argv[0]->ptr);
2774 if (!cmd) {
2775 errno = EINVAL;
2776 goto cleanup;
2777 }
2778 c->cmd = c->lastcmd = cmd;
2779
2780 /* Basic arity checks. */
2781 if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
2782 errno = EINVAL;
2783 goto cleanup;
2784 }
2785
2786 /* If this is a Redis Cluster node, we need to make sure the module is not
2787 * trying to access non-local keys, with the exception of commands
2788 * received from our master. */
2789 if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
2790 /* Duplicate relevant flags in the module client. */
2791 c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
2792 c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
2793 if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) !=
2794 server.cluster->myself)
2795 {
2796 errno = EPERM;
2797 goto cleanup;
2798 }
2799 }
2800
2801 /* If we are using single commands replication, we need to wrap what
2802 * we propagate into a MULTI/EXEC block, so that it will be atomic like
2803 * a Lua script in the context of AOF and slaves. */
2804 if (replicate) moduleReplicateMultiIfNeeded(ctx);
2805
2806 /* Run the command */
2807 int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
2808 if (replicate) {
2809 call_flags |= CMD_CALL_PROPAGATE_AOF;
2810 call_flags |= CMD_CALL_PROPAGATE_REPL;
2811 }
2812 call(c,call_flags);
2813
2814 /* Convert the result of the Redis command into a suitable Lua type.
2815 * The first thing we need is to create a single string from the client
2816 * output buffers. */
2817 sds proto = sdsnewlen(c->buf,c->bufpos);
2818 c->bufpos = 0;
2819 while(listLength(c->reply)) {
2820 clientReplyBlock *o = listNodeValue(listFirst(c->reply));
2821
2822 proto = sdscatlen(proto,o->buf,o->used);
2823 listDelNode(c->reply,listFirst(c->reply));
2824 }
2825 reply = moduleCreateCallReplyFromProto(ctx,proto);
2826 autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
2827
2828 cleanup:
2829 if (ctx->module) ctx->module->in_call--;
2830 freeClient(c);
2831 return reply;
2832 }
2833
2834 /* Return a pointer, and a length, to the protocol returned by the command
2835 * that returned the reply object. */
RM_CallReplyProto(RedisModuleCallReply * reply,size_t * len)2836 const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) {
2837 if (reply->proto) *len = sdslen(reply->proto);
2838 return reply->proto;
2839 }
2840
2841 /* --------------------------------------------------------------------------
2842 * Modules data types
2843 *
2844 * When String DMA or using existing data structures is not enough, it is
2845 * possible to create new data types from scratch and export them to
2846 * Redis. The module must provide a set of callbacks for handling the
2847 * new values exported (for example in order to provide RDB saving/loading,
2848 * AOF rewrite, and so forth). In this section we define this API.
2849 * -------------------------------------------------------------------------- */
2850
2851 /* Turn a 9 chars name in the specified charset and a 10 bit encver into
2852 * a single 64 bit unsigned integer that represents this exact module name
2853 * and version. This final number is called a "type ID" and is used when
2854 * writing module exported values to RDB files, in order to re-associate the
2855 * value to the right module to load them during RDB loading.
2856 *
2857 * If the string is not of the right length or the charset is wrong, or
2858 * if encver is outside the unsigned 10 bit integer range, 0 is returned,
2859 * otherwise the function returns the right type ID.
2860 *
2861 * The resulting 64 bit integer is composed as follows:
2862 *
2863 * (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits)
2864 *
2865 * The first 6 bits value is the first character, name[0], while the last
2866 * 6 bits value, immediately before the 10 bits integer, is name[8].
2867 * The last 10 bits are the encoding version.
2868 *
2869 * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce
2870 * zero as return value, that is the same we use to signal errors, thus
2871 * this combination is invalid, and also useless since type names should
2872 * try to be vary to avoid collisions. */
2873
2874 const char *ModuleTypeNameCharSet =
2875 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
2876 "abcdefghijklmnopqrstuvwxyz"
2877 "0123456789-_";
2878
moduleTypeEncodeId(const char * name,int encver)2879 uint64_t moduleTypeEncodeId(const char *name, int encver) {
2880 /* We use 64 symbols so that we can map each character into 6 bits
2881 * of the final output. */
2882 const char *cset = ModuleTypeNameCharSet;
2883 if (strlen(name) != 9) return 0;
2884 if (encver < 0 || encver > 1023) return 0;
2885
2886 uint64_t id = 0;
2887 for (int j = 0; j < 9; j++) {
2888 char *p = strchr(cset,name[j]);
2889 if (!p) return 0;
2890 unsigned long pos = p-cset;
2891 id = (id << 6) | pos;
2892 }
2893 id = (id << 10) | encver;
2894 return id;
2895 }
2896
2897 /* Search, in the list of exported data types of all the modules registered,
2898 * a type with the same name as the one given. Returns the moduleType
2899 * structure pointer if such a module is found, or NULL otherwise. */
moduleTypeLookupModuleByName(const char * name)2900 moduleType *moduleTypeLookupModuleByName(const char *name) {
2901 dictIterator *di = dictGetIterator(modules);
2902 dictEntry *de;
2903
2904 while ((de = dictNext(di)) != NULL) {
2905 struct RedisModule *module = dictGetVal(de);
2906 listIter li;
2907 listNode *ln;
2908
2909 listRewind(module->types,&li);
2910 while((ln = listNext(&li))) {
2911 moduleType *mt = ln->value;
2912 if (memcmp(name,mt->name,sizeof(mt->name)) == 0) {
2913 dictReleaseIterator(di);
2914 return mt;
2915 }
2916 }
2917 }
2918 dictReleaseIterator(di);
2919 return NULL;
2920 }
2921
2922 /* Lookup a module by ID, with caching. This function is used during RDB
2923 * loading. Modules exporting data types should never be able to unload, so
2924 * our cache does not need to expire. */
2925 #define MODULE_LOOKUP_CACHE_SIZE 3
2926
moduleTypeLookupModuleByID(uint64_t id)2927 moduleType *moduleTypeLookupModuleByID(uint64_t id) {
2928 static struct {
2929 uint64_t id;
2930 moduleType *mt;
2931 } cache[MODULE_LOOKUP_CACHE_SIZE];
2932
2933 /* Search in cache to start. */
2934 int j;
2935 for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE && cache[j].mt != NULL; j++)
2936 if (cache[j].id == id) return cache[j].mt;
2937
2938 /* Slow module by module lookup. */
2939 moduleType *mt = NULL;
2940 dictIterator *di = dictGetIterator(modules);
2941 dictEntry *de;
2942
2943 while ((de = dictNext(di)) != NULL && mt == NULL) {
2944 struct RedisModule *module = dictGetVal(de);
2945 listIter li;
2946 listNode *ln;
2947
2948 listRewind(module->types,&li);
2949 while((ln = listNext(&li))) {
2950 moduleType *this_mt = ln->value;
2951 /* Compare only the 54 bit module identifier and not the
2952 * encoding version. */
2953 if (this_mt->id >> 10 == id >> 10) {
2954 mt = this_mt;
2955 break;
2956 }
2957 }
2958 }
2959 dictReleaseIterator(di);
2960
2961 /* Add to cache if possible. */
2962 if (mt && j < MODULE_LOOKUP_CACHE_SIZE) {
2963 cache[j].id = id;
2964 cache[j].mt = mt;
2965 }
2966 return mt;
2967 }
2968
2969 /* Turn an (unresolved) module ID into a type name, to show the user an
2970 * error when RDB files contain module data we can't load.
2971 * The buffer pointed by 'name' must be 10 bytes at least. The function will
2972 * fill it with a null terminated module name. */
moduleTypeNameByID(char * name,uint64_t moduleid)2973 void moduleTypeNameByID(char *name, uint64_t moduleid) {
2974 const char *cset = ModuleTypeNameCharSet;
2975
2976 name[9] = '\0';
2977 char *p = name+8;
2978 moduleid >>= 10;
2979 for (int j = 0; j < 9; j++) {
2980 *p-- = cset[moduleid & 63];
2981 moduleid >>= 6;
2982 }
2983 }
2984
2985 /* Register a new data type exported by the module. The parameters are the
2986 * following. Please for in depth documentation check the modules API
2987 * documentation, especially the TYPES.md file.
2988 *
2989 * * **name**: A 9 characters data type name that MUST be unique in the Redis
2990 * Modules ecosystem. Be creative... and there will be no collisions. Use
2991 * the charset A-Z a-z 9-0, plus the two "-_" characters. A good
2992 * idea is to use, for example `<typename>-<vendor>`. For example
2993 * "tree-AntZ" may mean "Tree data structure by @antirez". To use both
2994 * lower case and upper case letters helps in order to prevent collisions.
2995 * * **encver**: Encoding version, which is, the version of the serialization
2996 * that a module used in order to persist data. As long as the "name"
2997 * matches, the RDB loading will be dispatched to the type callbacks
2998 * whatever 'encver' is used, however the module can understand if
2999 * the encoding it must load are of an older version of the module.
3000 * For example the module "tree-AntZ" initially used encver=0. Later
3001 * after an upgrade, it started to serialize data in a different format
3002 * and to register the type with encver=1. However this module may
3003 * still load old data produced by an older version if the rdb_load
3004 * callback is able to check the encver value and act accordingly.
3005 * The encver must be a positive value between 0 and 1023.
3006 * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
3007 * that should be populated with the methods callbacks and structure
3008 * version, like in the following example:
3009 *
3010 * RedisModuleTypeMethods tm = {
3011 * .version = REDISMODULE_TYPE_METHOD_VERSION,
3012 * .rdb_load = myType_RDBLoadCallBack,
3013 * .rdb_save = myType_RDBSaveCallBack,
3014 * .aof_rewrite = myType_AOFRewriteCallBack,
3015 * .free = myType_FreeCallBack,
3016 *
3017 * // Optional fields
3018 * .digest = myType_DigestCallBack,
3019 * .mem_usage = myType_MemUsageCallBack,
3020 * }
3021 *
3022 * * **rdb_load**: A callback function pointer that loads data from RDB files.
3023 * * **rdb_save**: A callback function pointer that saves data to RDB files.
3024 * * **aof_rewrite**: A callback function pointer that rewrites data as commands.
3025 * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
3026 * * **free**: A callback function pointer that can free a type value.
3027 *
3028 * The **digest* and **mem_usage** methods should currently be omitted since
3029 * they are not yet implemented inside the Redis modules core.
3030 *
3031 * Note: the module name "AAAAAAAAA" is reserved and produces an error, it
3032 * happens to be pretty lame as well.
3033 *
3034 * If there is already a module registering a type with the same name,
3035 * and if the module name or encver is invalid, NULL is returned.
3036 * Otherwise the new type is registered into Redis, and a reference of
3037 * type RedisModuleType is returned: the caller of the function should store
3038 * this reference into a gobal variable to make future use of it in the
3039 * modules type API, since a single module may register multiple types.
3040 * Example code fragment:
3041 *
3042 * static RedisModuleType *BalancedTreeType;
3043 *
3044 * int RedisModule_OnLoad(RedisModuleCtx *ctx) {
3045 * // some code here ...
3046 * BalancedTreeType = RM_CreateDataType(...);
3047 * }
3048 */
RM_CreateDataType(RedisModuleCtx * ctx,const char * name,int encver,void * typemethods_ptr)3049 moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) {
3050 uint64_t id = moduleTypeEncodeId(name,encver);
3051 if (id == 0) return NULL;
3052 if (moduleTypeLookupModuleByName(name) != NULL) return NULL;
3053
3054 long typemethods_version = ((long*)typemethods_ptr)[0];
3055 if (typemethods_version == 0) return NULL;
3056
3057 struct typemethods {
3058 uint64_t version;
3059 moduleTypeLoadFunc rdb_load;
3060 moduleTypeSaveFunc rdb_save;
3061 moduleTypeRewriteFunc aof_rewrite;
3062 moduleTypeMemUsageFunc mem_usage;
3063 moduleTypeDigestFunc digest;
3064 moduleTypeFreeFunc free;
3065 } *tms = (struct typemethods*) typemethods_ptr;
3066
3067 moduleType *mt = zcalloc(sizeof(*mt));
3068 mt->id = id;
3069 mt->module = ctx->module;
3070 mt->rdb_load = tms->rdb_load;
3071 mt->rdb_save = tms->rdb_save;
3072 mt->aof_rewrite = tms->aof_rewrite;
3073 mt->mem_usage = tms->mem_usage;
3074 mt->digest = tms->digest;
3075 mt->free = tms->free;
3076 memcpy(mt->name,name,sizeof(mt->name));
3077 listAddNodeTail(ctx->module->types,mt);
3078 return mt;
3079 }
3080
3081 /* If the key is open for writing, set the specified module type object
3082 * as the value of the key, deleting the old value if any.
3083 * On success REDISMODULE_OK is returned. If the key is not open for
3084 * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_ModuleTypeSetValue(RedisModuleKey * key,moduleType * mt,void * value)3085 int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
3086 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
3087 RM_DeleteKey(key);
3088 robj *o = createModuleObject(mt,value);
3089 setKey(key->db,key->key,o);
3090 decrRefCount(o);
3091 key->value = o;
3092 return REDISMODULE_OK;
3093 }
3094
3095 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3096 * the key, returns the module type pointer of the value stored at key.
3097 *
3098 * If the key is NULL, is not associated with a module type, or is empty,
3099 * then NULL is returned instead. */
RM_ModuleTypeGetType(RedisModuleKey * key)3100 moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) {
3101 if (key == NULL ||
3102 key->value == NULL ||
3103 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3104 moduleValue *mv = key->value->ptr;
3105 return mv->type;
3106 }
3107
3108 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3109 * the key, returns the module type low-level value stored at key, as
3110 * it was set by the user via RedisModule_ModuleTypeSet().
3111 *
3112 * If the key is NULL, is not associated with a module type, or is empty,
3113 * then NULL is returned instead. */
RM_ModuleTypeGetValue(RedisModuleKey * key)3114 void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
3115 if (key == NULL ||
3116 key->value == NULL ||
3117 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3118 moduleValue *mv = key->value->ptr;
3119 return mv->value;
3120 }
3121
3122 /* --------------------------------------------------------------------------
3123 * RDB loading and saving functions
3124 * -------------------------------------------------------------------------- */
3125
3126 /* Called when there is a load error in the context of a module. This cannot
3127 * be recovered like for the built-in types. */
moduleRDBLoadError(RedisModuleIO * io)3128 void moduleRDBLoadError(RedisModuleIO *io) {
3129 serverLog(LL_WARNING,
3130 "Error loading data from RDB (short read or EOF). "
3131 "Read performed by module '%s' about type '%s' "
3132 "after reading '%llu' bytes of a value.",
3133 io->type->module->name,
3134 io->type->name,
3135 (unsigned long long)io->bytes);
3136 exit(1);
3137 }
3138
3139 /* Save an unsigned 64 bit value into the RDB file. This function should only
3140 * be called in the context of the rdb_save method of modules implementing new
3141 * data types. */
RM_SaveUnsigned(RedisModuleIO * io,uint64_t value)3142 void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
3143 if (io->error) return;
3144 /* Save opcode. */
3145 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT);
3146 if (retval == -1) goto saveerr;
3147 io->bytes += retval;
3148 /* Save value. */
3149 retval = rdbSaveLen(io->rio, value);
3150 if (retval == -1) goto saveerr;
3151 io->bytes += retval;
3152 return;
3153
3154 saveerr:
3155 io->error = 1;
3156 }
3157
3158 /* Load an unsigned 64 bit value from the RDB file. This function should only
3159 * be called in the context of the rdb_load method of modules implementing
3160 * new data types. */
RM_LoadUnsigned(RedisModuleIO * io)3161 uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
3162 if (io->ver == 2) {
3163 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3164 if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
3165 }
3166 uint64_t value;
3167 int retval = rdbLoadLenByRef(io->rio, NULL, &value);
3168 if (retval == -1) goto loaderr;
3169 return value;
3170
3171 loaderr:
3172 moduleRDBLoadError(io);
3173 return 0; /* Never reached. */
3174 }
3175
3176 /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
RM_SaveSigned(RedisModuleIO * io,int64_t value)3177 void RM_SaveSigned(RedisModuleIO *io, int64_t value) {
3178 union {uint64_t u; int64_t i;} conv;
3179 conv.i = value;
3180 RM_SaveUnsigned(io,conv.u);
3181 }
3182
3183 /* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */
RM_LoadSigned(RedisModuleIO * io)3184 int64_t RM_LoadSigned(RedisModuleIO *io) {
3185 union {uint64_t u; int64_t i;} conv;
3186 conv.u = RM_LoadUnsigned(io);
3187 return conv.i;
3188 }
3189
3190 /* In the context of the rdb_save method of a module type, saves a
3191 * string into the RDB file taking as input a RedisModuleString.
3192 *
3193 * The string can be later loaded with RedisModule_LoadString() or
3194 * other Load family functions expecting a serialized string inside
3195 * the RDB file. */
RM_SaveString(RedisModuleIO * io,RedisModuleString * s)3196 void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
3197 if (io->error) return;
3198 /* Save opcode. */
3199 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3200 if (retval == -1) goto saveerr;
3201 io->bytes += retval;
3202 /* Save value. */
3203 retval = rdbSaveStringObject(io->rio, s);
3204 if (retval == -1) goto saveerr;
3205 io->bytes += retval;
3206 return;
3207
3208 saveerr:
3209 io->error = 1;
3210 }
3211
3212 /* Like RedisModule_SaveString() but takes a raw C pointer and length
3213 * as input. */
RM_SaveStringBuffer(RedisModuleIO * io,const char * str,size_t len)3214 void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
3215 if (io->error) return;
3216 /* Save opcode. */
3217 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3218 if (retval == -1) goto saveerr;
3219 io->bytes += retval;
3220 /* Save value. */
3221 retval = rdbSaveRawString(io->rio, (unsigned char*)str,len);
3222 if (retval == -1) goto saveerr;
3223 io->bytes += retval;
3224 return;
3225
3226 saveerr:
3227 io->error = 1;
3228 }
3229
3230 /* Implements RM_LoadString() and RM_LoadStringBuffer() */
moduleLoadString(RedisModuleIO * io,int plain,size_t * lenptr)3231 void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
3232 if (io->ver == 2) {
3233 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3234 if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
3235 }
3236 void *s = rdbGenericLoadStringObject(io->rio,
3237 plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
3238 if (s == NULL) goto loaderr;
3239 return s;
3240
3241 loaderr:
3242 moduleRDBLoadError(io);
3243 return NULL; /* Never reached. */
3244 }
3245
3246 /* In the context of the rdb_load method of a module data type, loads a string
3247 * from the RDB file, that was previously saved with RedisModule_SaveString()
3248 * functions family.
3249 *
3250 * The returned string is a newly allocated RedisModuleString object, and
3251 * the user should at some point free it with a call to RedisModule_FreeString().
3252 *
3253 * If the data structure does not store strings as RedisModuleString objects,
3254 * the similar function RedisModule_LoadStringBuffer() could be used instead. */
RM_LoadString(RedisModuleIO * io)3255 RedisModuleString *RM_LoadString(RedisModuleIO *io) {
3256 return moduleLoadString(io,0,NULL);
3257 }
3258
3259 /* Like RedisModule_LoadString() but returns an heap allocated string that
3260 * was allocated with RedisModule_Alloc(), and can be resized or freed with
3261 * RedisModule_Realloc() or RedisModule_Free().
3262 *
3263 * The size of the string is stored at '*lenptr' if not NULL.
3264 * The returned string is not automatically NULL termianted, it is loaded
3265 * exactly as it was stored inisde the RDB file. */
RM_LoadStringBuffer(RedisModuleIO * io,size_t * lenptr)3266 char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
3267 return moduleLoadString(io,1,lenptr);
3268 }
3269
3270 /* In the context of the rdb_save method of a module data type, saves a double
3271 * value to the RDB file. The double can be a valid number, a NaN or infinity.
3272 * It is possible to load back the value with RedisModule_LoadDouble(). */
RM_SaveDouble(RedisModuleIO * io,double value)3273 void RM_SaveDouble(RedisModuleIO *io, double value) {
3274 if (io->error) return;
3275 /* Save opcode. */
3276 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE);
3277 if (retval == -1) goto saveerr;
3278 io->bytes += retval;
3279 /* Save value. */
3280 retval = rdbSaveBinaryDoubleValue(io->rio, value);
3281 if (retval == -1) goto saveerr;
3282 io->bytes += retval;
3283 return;
3284
3285 saveerr:
3286 io->error = 1;
3287 }
3288
3289 /* In the context of the rdb_save method of a module data type, loads back the
3290 * double value saved by RedisModule_SaveDouble(). */
RM_LoadDouble(RedisModuleIO * io)3291 double RM_LoadDouble(RedisModuleIO *io) {
3292 if (io->ver == 2) {
3293 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3294 if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
3295 }
3296 double value;
3297 int retval = rdbLoadBinaryDoubleValue(io->rio, &value);
3298 if (retval == -1) goto loaderr;
3299 return value;
3300
3301 loaderr:
3302 moduleRDBLoadError(io);
3303 return 0; /* Never reached. */
3304 }
3305
3306 /* In the context of the rdb_save method of a module data type, saves a float
3307 * value to the RDB file. The float can be a valid number, a NaN or infinity.
3308 * It is possible to load back the value with RedisModule_LoadFloat(). */
RM_SaveFloat(RedisModuleIO * io,float value)3309 void RM_SaveFloat(RedisModuleIO *io, float value) {
3310 if (io->error) return;
3311 /* Save opcode. */
3312 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT);
3313 if (retval == -1) goto saveerr;
3314 io->bytes += retval;
3315 /* Save value. */
3316 retval = rdbSaveBinaryFloatValue(io->rio, value);
3317 if (retval == -1) goto saveerr;
3318 io->bytes += retval;
3319 return;
3320
3321 saveerr:
3322 io->error = 1;
3323 }
3324
3325 /* In the context of the rdb_save method of a module data type, loads back the
3326 * float value saved by RedisModule_SaveFloat(). */
RM_LoadFloat(RedisModuleIO * io)3327 float RM_LoadFloat(RedisModuleIO *io) {
3328 if (io->ver == 2) {
3329 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3330 if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
3331 }
3332 float value;
3333 int retval = rdbLoadBinaryFloatValue(io->rio, &value);
3334 if (retval == -1) goto loaderr;
3335 return value;
3336
3337 loaderr:
3338 moduleRDBLoadError(io);
3339 return 0; /* Never reached. */
3340 }
3341
3342 /* --------------------------------------------------------------------------
3343 * Key digest API (DEBUG DIGEST interface for modules types)
3344 * -------------------------------------------------------------------------- */
3345
3346 /* Add a new element to the digest. This function can be called multiple times
3347 * one element after the other, for all the elements that constitute a given
3348 * data structure. The function call must be followed by the call to
3349 * `RedisModule_DigestEndSequence` eventually, when all the elements that are
3350 * always in a given order are added. See the Redis Modules data types
3351 * documentation for more info. However this is a quick example that uses Redis
3352 * data types as an example.
3353 *
3354 * To add a sequence of unordered elements (for example in the case of a Redis
3355 * Set), the pattern to use is:
3356 *
3357 * foreach element {
3358 * AddElement(element);
3359 * EndSequence();
3360 * }
3361 *
3362 * Because Sets are not ordered, so every element added has a position that
3363 * does not depend from the other. However if instead our elements are
3364 * ordered in pairs, like field-value pairs of an Hash, then one should
3365 * use:
3366 *
3367 * foreach key,value {
3368 * AddElement(key);
3369 * AddElement(value);
3370 * EndSquence();
3371 * }
3372 *
3373 * Because the key and value will be always in the above order, while instead
3374 * the single key-value pairs, can appear in any position into a Redis hash.
3375 *
3376 * A list of ordered elements would be implemented with:
3377 *
3378 * foreach element {
3379 * AddElement(element);
3380 * }
3381 * EndSequence();
3382 *
3383 */
RM_DigestAddStringBuffer(RedisModuleDigest * md,unsigned char * ele,size_t len)3384 void RM_DigestAddStringBuffer(RedisModuleDigest *md, unsigned char *ele, size_t len) {
3385 mixDigest(md->o,ele,len);
3386 }
3387
3388 /* Like `RedisModule_DigestAddStringBuffer()` but takes a long long as input
3389 * that gets converted into a string before adding it to the digest. */
RM_DigestAddLongLong(RedisModuleDigest * md,long long ll)3390 void RM_DigestAddLongLong(RedisModuleDigest *md, long long ll) {
3391 char buf[LONG_STR_SIZE];
3392 size_t len = ll2string(buf,sizeof(buf),ll);
3393 mixDigest(md->o,buf,len);
3394 }
3395
3396 /* See the documentation for `RedisModule_DigestAddElement()`. */
RM_DigestEndSequence(RedisModuleDigest * md)3397 void RM_DigestEndSequence(RedisModuleDigest *md) {
3398 xorDigest(md->x,md->o,sizeof(md->o));
3399 memset(md->o,0,sizeof(md->o));
3400 }
3401
3402 /* --------------------------------------------------------------------------
3403 * AOF API for modules data types
3404 * -------------------------------------------------------------------------- */
3405
3406 /* Emits a command into the AOF during the AOF rewriting process. This function
3407 * is only called in the context of the aof_rewrite method of data types exported
3408 * by a module. The command works exactly like RedisModule_Call() in the way
3409 * the parameters are passed, but it does not return anything as the error
3410 * handling is performed by Redis itself. */
RM_EmitAOF(RedisModuleIO * io,const char * cmdname,const char * fmt,...)3411 void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
3412 if (io->error) return;
3413 struct redisCommand *cmd;
3414 robj **argv = NULL;
3415 int argc = 0, flags = 0, j;
3416 va_list ap;
3417
3418 cmd = lookupCommandByCString((char*)cmdname);
3419 if (!cmd) {
3420 serverLog(LL_WARNING,
3421 "Fatal: AOF method for module data type '%s' tried to "
3422 "emit unknown command '%s'",
3423 io->type->name, cmdname);
3424 io->error = 1;
3425 errno = EINVAL;
3426 return;
3427 }
3428
3429 /* Emit the arguments into the AOF in Redis protocol format. */
3430 va_start(ap, fmt);
3431 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
3432 va_end(ap);
3433 if (argv == NULL) {
3434 serverLog(LL_WARNING,
3435 "Fatal: AOF method for module data type '%s' tried to "
3436 "call RedisModule_EmitAOF() with wrong format specifiers '%s'",
3437 io->type->name, fmt);
3438 io->error = 1;
3439 errno = EINVAL;
3440 return;
3441 }
3442
3443 /* Bulk count. */
3444 if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0)
3445 io->error = 1;
3446
3447 /* Arguments. */
3448 for (j = 0; j < argc; j++) {
3449 if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0)
3450 io->error = 1;
3451 decrRefCount(argv[j]);
3452 }
3453 zfree(argv);
3454 return;
3455 }
3456
3457 /* --------------------------------------------------------------------------
3458 * IO context handling
3459 * -------------------------------------------------------------------------- */
3460
RM_GetContextFromIO(RedisModuleIO * io)3461 RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
3462 if (io->ctx) return io->ctx; /* Can't have more than one... */
3463 RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
3464 io->ctx = zmalloc(sizeof(RedisModuleCtx));
3465 *(io->ctx) = ctxtemplate;
3466 io->ctx->module = io->type->module;
3467 io->ctx->client = NULL;
3468 return io->ctx;
3469 }
3470
3471 /* Returns a RedisModuleString with the name of the key currently saving or
3472 * loading, when an IO data type callback is called. There is no guarantee
3473 * that the key name is always available, so this may return NULL.
3474 */
RM_GetKeyNameFromIO(RedisModuleIO * io)3475 const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
3476 return io->key;
3477 }
3478
3479 /* --------------------------------------------------------------------------
3480 * Logging
3481 * -------------------------------------------------------------------------- */
3482
3483 /* This is the low level function implementing both:
3484 *
3485 * RM_Log()
3486 * RM_LogIOError()
3487 *
3488 */
RM_LogRaw(RedisModule * module,const char * levelstr,const char * fmt,va_list ap)3489 void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) {
3490 char msg[LOG_MAX_LEN];
3491 size_t name_len;
3492 int level;
3493
3494 if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
3495 else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
3496 else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
3497 else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
3498 else level = LL_VERBOSE; /* Default. */
3499
3500 if (level < server.verbosity) return;
3501
3502 name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
3503 vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
3504 serverLogRaw(level,msg);
3505 }
3506
3507 /* Produces a log message to the standard Redis log, the format accepts
3508 * printf-alike specifiers, while level is a string describing the log
3509 * level to use when emitting the log, and must be one of the following:
3510 *
3511 * * "debug"
3512 * * "verbose"
3513 * * "notice"
3514 * * "warning"
3515 *
3516 * If the specified log level is invalid, verbose is used by default.
3517 * There is a fixed limit to the length of the log line this function is able
3518 * to emit, this limit is not specified but is guaranteed to be more than
3519 * a few lines of text.
3520 */
RM_Log(RedisModuleCtx * ctx,const char * levelstr,const char * fmt,...)3521 void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
3522 if (!ctx->module) return; /* Can only log if module is initialized */
3523
3524 va_list ap;
3525 va_start(ap, fmt);
3526 RM_LogRaw(ctx->module,levelstr,fmt,ap);
3527 va_end(ap);
3528 }
3529
3530 /* Log errors from RDB / AOF serialization callbacks.
3531 *
3532 * This function should be used when a callback is returning a critical
3533 * error to the caller since cannot load or save the data for some
3534 * critical reason. */
RM_LogIOError(RedisModuleIO * io,const char * levelstr,const char * fmt,...)3535 void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) {
3536 va_list ap;
3537 va_start(ap, fmt);
3538 RM_LogRaw(io->type->module,levelstr,fmt,ap);
3539 va_end(ap);
3540 }
3541
3542 /* --------------------------------------------------------------------------
3543 * Blocking clients from modules
3544 * -------------------------------------------------------------------------- */
3545
3546 /* Readable handler for the awake pipe. We do nothing here, the awake bytes
3547 * will be actually read in a more appropriate place in the
3548 * moduleHandleBlockedClients() function that is where clients are actually
3549 * served. */
moduleBlockedClientPipeReadable(aeEventLoop * el,int fd,void * privdata,int mask)3550 void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
3551 UNUSED(el);
3552 UNUSED(fd);
3553 UNUSED(mask);
3554 UNUSED(privdata);
3555 }
3556
3557 /* This is called from blocked.c in order to unblock a client: may be called
3558 * for multiple reasons while the client is in the middle of being blocked
3559 * because the client is terminated, but is also called for cleanup when a
3560 * client is unblocked in a clean way after replaying.
3561 *
3562 * What we do here is just to set the client to NULL in the redis module
3563 * blocked client handle. This way if the client is terminated while there
3564 * is a pending threaded operation involving the blocked client, we'll know
3565 * that the client no longer exists and no reply callback should be called.
3566 *
3567 * The structure RedisModuleBlockedClient will be always deallocated when
3568 * running the list of clients blocked by a module that need to be unblocked. */
unblockClientFromModule(client * c)3569 void unblockClientFromModule(client *c) {
3570 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3571
3572 /* Call the disconnection callback if any. */
3573 if (bc->disconnect_callback) {
3574 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3575 ctx.blocked_privdata = bc->privdata;
3576 ctx.module = bc->module;
3577 ctx.client = bc->client;
3578 bc->disconnect_callback(&ctx,bc);
3579 moduleFreeContext(&ctx);
3580 }
3581
3582 bc->client = NULL;
3583 /* Reset the client for a new query since, for blocking commands implemented
3584 * into modules, we do not it immediately after the command returns (and
3585 * the client blocks) in order to be still able to access the argument
3586 * vector from callbacks. */
3587 resetClient(c);
3588 }
3589
3590 /* Block a client in the context of a blocking command, returning an handle
3591 * which will be used, later, in order to unblock the client with a call to
3592 * RedisModule_UnblockClient(). The arguments specify callback functions
3593 * and a timeout after which the client is unblocked.
3594 *
3595 * The callbacks are called in the following contexts:
3596 *
3597 * reply_callback: called after a successful RedisModule_UnblockClient()
3598 * call in order to reply to the client and unblock it.
3599 *
3600 * reply_timeout: called when the timeout is reached in order to send an
3601 * error to the client.
3602 *
3603 * free_privdata: called in order to free the private data that is passed
3604 * by RedisModule_UnblockClient() call.
3605 */
RM_BlockClient(RedisModuleCtx * ctx,RedisModuleCmdFunc reply_callback,RedisModuleCmdFunc timeout_callback,void (* free_privdata)(RedisModuleCtx *,void *),long long timeout_ms)3606 RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
3607 client *c = ctx->client;
3608 int islua = c->flags & CLIENT_LUA;
3609 int ismulti = c->flags & CLIENT_MULTI;
3610
3611 c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
3612 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3613
3614 /* We need to handle the invalid operation of calling modules blocking
3615 * commands from Lua or MULTI. We actually create an already aborted
3616 * (client set to NULL) blocked client handle, and actually reply with
3617 * an error. */
3618 bc->client = (islua || ismulti) ? NULL : c;
3619 bc->module = ctx->module;
3620 bc->reply_callback = reply_callback;
3621 bc->timeout_callback = timeout_callback;
3622 bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
3623 bc->free_privdata = free_privdata;
3624 bc->privdata = NULL;
3625 bc->reply_client = createClient(-1);
3626 bc->reply_client->flags |= CLIENT_MODULE;
3627 bc->dbid = c->db->id;
3628 c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
3629
3630 if (islua || ismulti) {
3631 c->bpop.module_blocked_handle = NULL;
3632 addReplyError(c, islua ?
3633 "Blocking module command called from Lua script" :
3634 "Blocking module command called from transaction");
3635 } else {
3636 blockClient(c,BLOCKED_MODULE);
3637 }
3638 return bc;
3639 }
3640
3641 /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
3642 * the reply callbacks to be called in order to reply to the client.
3643 * The 'privdata' argument will be accessible by the reply callback, so
3644 * the caller of this function can pass any value that is needed in order to
3645 * actually reply to the client.
3646 *
3647 * A common usage for 'privdata' is a thread that computes something that
3648 * needs to be passed to the client, included but not limited some slow
3649 * to compute reply or some reply obtained via networking.
3650 *
3651 * Note: this function can be called from threads spawned by the module. */
RM_UnblockClient(RedisModuleBlockedClient * bc,void * privdata)3652 int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
3653 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3654 bc->privdata = privdata;
3655 listAddNodeTail(moduleUnblockedClients,bc);
3656 if (write(server.module_blocked_pipe[1],"A",1) != 1) {
3657 /* Ignore the error, this is best-effort. */
3658 }
3659 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3660 return REDISMODULE_OK;
3661 }
3662
3663 /* Abort a blocked client blocking operation: the client will be unblocked
3664 * without firing any callback. */
RM_AbortBlock(RedisModuleBlockedClient * bc)3665 int RM_AbortBlock(RedisModuleBlockedClient *bc) {
3666 bc->reply_callback = NULL;
3667 bc->disconnect_callback = NULL;
3668 return RM_UnblockClient(bc,NULL);
3669 }
3670
3671 /* Set a callback that will be called if a blocked client disconnects
3672 * before the module has a chance to call RedisModule_UnblockClient()
3673 *
3674 * Usually what you want to do there, is to cleanup your module state
3675 * so that you can call RedisModule_UnblockClient() safely, otherwise
3676 * the client will remain blocked forever if the timeout is large.
3677 *
3678 * Notes:
3679 *
3680 * 1. It is not safe to call Reply* family functions here, it is also
3681 * useless since the client is gone.
3682 *
3683 * 2. This callback is not called if the client disconnects because of
3684 * a timeout. In such a case, the client is unblocked automatically
3685 * and the timeout callback is called.
3686 */
RM_SetDisconnectCallback(RedisModuleBlockedClient * bc,RedisModuleDisconnectFunc callback)3687 void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
3688 bc->disconnect_callback = callback;
3689 }
3690
3691 /* This function will check the moduleUnblockedClients queue in order to
3692 * call the reply callback and really unblock the client.
3693 *
3694 * Clients end into this list because of calls to RM_UnblockClient(),
3695 * however it is possible that while the module was doing work for the
3696 * blocked client, it was terminated by Redis (for timeout or other reasons).
3697 * When this happens the RedisModuleBlockedClient structure in the queue
3698 * will have the 'client' field set to NULL. */
moduleHandleBlockedClients(void)3699 void moduleHandleBlockedClients(void) {
3700 listNode *ln;
3701 RedisModuleBlockedClient *bc;
3702
3703 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3704 /* Here we unblock all the pending clients blocked in modules operations
3705 * so we can read every pending "awake byte" in the pipe. */
3706 char buf[1];
3707 while (read(server.module_blocked_pipe[0],buf,1) == 1);
3708 while (listLength(moduleUnblockedClients)) {
3709 ln = listFirst(moduleUnblockedClients);
3710 bc = ln->value;
3711 client *c = bc->client;
3712 listDelNode(moduleUnblockedClients,ln);
3713 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3714
3715 /* Release the lock during the loop, as long as we don't
3716 * touch the shared list. */
3717
3718 /* Call the reply callback if the client is valid and we have
3719 * any callback. */
3720 if (c && bc->reply_callback) {
3721 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3722 ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
3723 ctx.blocked_privdata = bc->privdata;
3724 ctx.module = bc->module;
3725 ctx.client = bc->client;
3726 ctx.blocked_client = bc;
3727 bc->reply_callback(&ctx,(void**)c->argv,c->argc);
3728 moduleHandlePropagationAfterCommandCallback(&ctx);
3729 moduleFreeContext(&ctx);
3730 }
3731
3732 /* Free privdata if any. */
3733 if (bc->privdata && bc->free_privdata) {
3734 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3735 if (c == NULL)
3736 ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
3737 ctx.blocked_privdata = bc->privdata;
3738 ctx.module = bc->module;
3739 ctx.client = bc->client;
3740 bc->free_privdata(&ctx,bc->privdata);
3741 moduleFreeContext(&ctx);
3742 }
3743
3744 /* It is possible that this blocked client object accumulated
3745 * replies to send to the client in a thread safe context.
3746 * We need to glue such replies to the client output buffer and
3747 * free the temporary client we just used for the replies. */
3748 if (c) AddReplyFromClient(c, bc->reply_client);
3749 freeClient(bc->reply_client);
3750
3751 if (c != NULL) {
3752 /* Before unblocking the client, set the disconnect callback
3753 * to NULL, because if we reached this point, the client was
3754 * properly unblocked by the module. */
3755 bc->disconnect_callback = NULL;
3756 unblockClient(c);
3757 /* Put the client in the list of clients that need to write
3758 * if there are pending replies here. This is needed since
3759 * during a non blocking command the client may receive output. */
3760 if (clientHasPendingReplies(c) &&
3761 !(c->flags & CLIENT_PENDING_WRITE))
3762 {
3763 c->flags |= CLIENT_PENDING_WRITE;
3764 listAddNodeHead(server.clients_pending_write,c);
3765 }
3766 }
3767
3768 /* Free 'bc' only after unblocking the client, since it is
3769 * referenced in the client blocking context, and must be valid
3770 * when calling unblockClient(). */
3771 zfree(bc);
3772
3773 /* Lock again before to iterate the loop. */
3774 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3775 }
3776 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3777 }
3778
3779 /* Called when our client timed out. After this function unblockClient()
3780 * is called, and it will invalidate the blocked client. So this function
3781 * does not need to do any cleanup. Eventually the module will call the
3782 * API to unblock the client and the memory will be released. */
moduleBlockedClientTimedOut(client * c)3783 void moduleBlockedClientTimedOut(client *c) {
3784 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3785 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3786 ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
3787 ctx.module = bc->module;
3788 ctx.client = bc->client;
3789 ctx.blocked_client = bc;
3790 bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
3791 moduleFreeContext(&ctx);
3792 /* For timeout events, we do not want to call the disconnect callback,
3793 * because the blocked client will be automatically disconnected in
3794 * this case, and the user can still hook using the timeout callback. */
3795 bc->disconnect_callback = NULL;
3796 }
3797
3798 /* Return non-zero if a module command was called in order to fill the
3799 * reply for a blocked client. */
RM_IsBlockedReplyRequest(RedisModuleCtx * ctx)3800 int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
3801 return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
3802 }
3803
3804 /* Return non-zero if a module command was called in order to fill the
3805 * reply for a blocked client that timed out. */
RM_IsBlockedTimeoutRequest(RedisModuleCtx * ctx)3806 int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
3807 return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
3808 }
3809
3810 /* Get the private data set by RedisModule_UnblockClient() */
RM_GetBlockedClientPrivateData(RedisModuleCtx * ctx)3811 void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
3812 return ctx->blocked_privdata;
3813 }
3814
3815 /* Get the blocked client associated with a given context.
3816 * This is useful in the reply and timeout callbacks of blocked clients,
3817 * before sometimes the module has the blocked client handle references
3818 * around, and wants to cleanup it. */
RM_GetBlockedClientHandle(RedisModuleCtx * ctx)3819 RedisModuleBlockedClient *RM_GetBlockedClientHandle(RedisModuleCtx *ctx) {
3820 return ctx->blocked_client;
3821 }
3822
3823 /* Return true if when the free callback of a blocked client is called,
3824 * the reason for the client to be unblocked is that it disconnected
3825 * while it was blocked. */
RM_BlockedClientDisconnected(RedisModuleCtx * ctx)3826 int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
3827 return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0;
3828 }
3829
3830 /* --------------------------------------------------------------------------
3831 * Thread Safe Contexts
3832 * -------------------------------------------------------------------------- */
3833
3834 /* Return a context which can be used inside threads to make Redis context
3835 * calls with certain modules APIs. If 'bc' is not NULL then the module will
3836 * be bound to a blocked client, and it will be possible to use the
3837 * `RedisModule_Reply*` family of functions to accumulate a reply for when the
3838 * client will be unblocked. Otherwise the thread safe context will be
3839 * detached by a specific client.
3840 *
3841 * To call non-reply APIs, the thread safe context must be prepared with:
3842 *
3843 * RedisModule_ThreadSafeCallStart(ctx);
3844 * ... make your call here ...
3845 * RedisModule_ThreadSafeCallStop(ctx);
3846 *
3847 * This is not needed when using `RedisModule_Reply*` functions, assuming
3848 * that a blocked client was used when the context was created, otherwise
3849 * no RedisModule_Reply* call should be made at all.
3850 *
3851 * TODO: thread safe contexts do not inherit the blocked client
3852 * selected database. */
RM_GetThreadSafeContext(RedisModuleBlockedClient * bc)3853 RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
3854 RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
3855 RedisModuleCtx empty = REDISMODULE_CTX_INIT;
3856 memcpy(ctx,&empty,sizeof(empty));
3857 if (bc) {
3858 ctx->blocked_client = bc;
3859 ctx->module = bc->module;
3860 }
3861 ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
3862 /* Even when the context is associated with a blocked client, we can't
3863 * access it safely from another thread, so we create a fake client here
3864 * in order to keep things like the currently selected database and similar
3865 * things. */
3866 ctx->client = createClient(-1);
3867 if (bc) {
3868 selectDb(ctx->client,bc->dbid);
3869 ctx->client->id = bc->client->id;
3870 }
3871 return ctx;
3872 }
3873
3874 /* Release a thread safe context. */
RM_FreeThreadSafeContext(RedisModuleCtx * ctx)3875 void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
3876 moduleFreeContext(ctx);
3877 zfree(ctx);
3878 }
3879
3880 /* Acquire the server lock before executing a thread safe API call.
3881 * This is not needed for `RedisModule_Reply*` calls when there is
3882 * a blocked client connected to the thread safe context. */
RM_ThreadSafeContextLock(RedisModuleCtx * ctx)3883 void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
3884 UNUSED(ctx);
3885 moduleAcquireGIL();
3886 }
3887
3888 /* Release the server lock after a thread safe API call was executed. */
RM_ThreadSafeContextUnlock(RedisModuleCtx * ctx)3889 void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
3890 UNUSED(ctx);
3891 moduleReleaseGIL();
3892 }
3893
moduleAcquireGIL(void)3894 void moduleAcquireGIL(void) {
3895 pthread_mutex_lock(&moduleGIL);
3896 }
3897
moduleReleaseGIL(void)3898 void moduleReleaseGIL(void) {
3899 pthread_mutex_unlock(&moduleGIL);
3900 }
3901
3902
3903 /* --------------------------------------------------------------------------
3904 * Module Keyspace Notifications API
3905 * -------------------------------------------------------------------------- */
3906
3907 /* Subscribe to keyspace notifications. This is a low-level version of the
3908 * keyspace-notifications API. A module can register callbacks to be notified
3909 * when keyspce events occur.
3910 *
3911 * Notification events are filtered by their type (string events, set events,
3912 * etc), and the subscriber callback receives only events that match a specific
3913 * mask of event types.
3914 *
3915 * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents
3916 * the module must provide an event type-mask, denoting the events the subscriber
3917 * is interested in. This can be an ORed mask of any of the following flags:
3918 *
3919 * - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME
3920 * - REDISMODULE_NOTIFY_STRING: String events
3921 * - REDISMODULE_NOTIFY_LIST: List events
3922 * - REDISMODULE_NOTIFY_SET: Set events
3923 * - REDISMODULE_NOTIFY_HASH: Hash events
3924 * - REDISMODULE_NOTIFY_ZSET: Sorted Set events
3925 * - REDISMODULE_NOTIFY_EXPIRED: Expiration events
3926 * - REDISMODULE_NOTIFY_EVICTED: Eviction events
3927 * - REDISMODULE_NOTIFY_STREAM: Stream events
3928 * - REDISMODULE_NOTIFY_ALL: All events
3929 *
3930 * We do not distinguish between key events and keyspace events, and it is up
3931 * to the module to filter the actions taken based on the key.
3932 *
3933 * The subscriber signature is:
3934 *
3935 * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
3936 * const char *event,
3937 * RedisModuleString *key);
3938 *
3939 * `type` is the event type bit, that must match the mask given at registration
3940 * time. The event string is the actual command being executed, and key is the
3941 * relevant Redis key.
3942 *
3943 * Notification callback gets executed with a redis context that can not be
3944 * used to send anything to the client, and has the db number where the event
3945 * occurred as its selected db number.
3946 *
3947 * Notice that it is not necessary to enable notifications in redis.conf for
3948 * module notifications to work.
3949 *
3950 * Warning: the notification callbacks are performed in a synchronous manner,
3951 * so notification callbacks must to be fast, or they would slow Redis down.
3952 * If you need to take long actions, use threads to offload them.
3953 *
3954 * See https://redis.io/topics/notifications for more information.
3955 */
RM_SubscribeToKeyspaceEvents(RedisModuleCtx * ctx,int types,RedisModuleNotificationFunc callback)3956 int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
3957 RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
3958 sub->module = ctx->module;
3959 sub->event_mask = types;
3960 sub->notify_callback = callback;
3961 sub->active = 0;
3962
3963 listAddNodeTail(moduleKeyspaceSubscribers, sub);
3964 return REDISMODULE_OK;
3965 }
3966
3967 /* Dispatcher for keyspace notifications to module subscriber functions.
3968 * This gets called only if at least one module requested to be notified on
3969 * keyspace notifications */
moduleNotifyKeyspaceEvent(int type,const char * event,robj * key,int dbid)3970 void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
3971 /* Don't do anything if there aren't any subscribers */
3972 if (listLength(moduleKeyspaceSubscribers) == 0) return;
3973
3974 listIter li;
3975 listNode *ln;
3976 listRewind(moduleKeyspaceSubscribers,&li);
3977
3978 /* Remove irrelevant flags from the type mask */
3979 type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
3980
3981 while((ln = listNext(&li))) {
3982 RedisModuleKeyspaceSubscriber *sub = ln->value;
3983 /* Only notify subscribers on events matching they registration,
3984 * and avoid subscribers triggering themselves */
3985 if ((sub->event_mask & type) && sub->active == 0) {
3986 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3987 ctx.module = sub->module;
3988 ctx.client = moduleFreeContextReusedClient;
3989 selectDb(ctx.client, dbid);
3990
3991 /* mark the handler as active to avoid reentrant loops.
3992 * If the subscriber performs an action triggering itself,
3993 * it will not be notified about it. */
3994 sub->active = 1;
3995 sub->notify_callback(&ctx, type, event, key);
3996 sub->active = 0;
3997 moduleFreeContext(&ctx);
3998 }
3999 }
4000 }
4001
4002 /* Unsubscribe any notification subscribers this module has upon unloading */
moduleUnsubscribeNotifications(RedisModule * module)4003 void moduleUnsubscribeNotifications(RedisModule *module) {
4004 listIter li;
4005 listNode *ln;
4006 listRewind(moduleKeyspaceSubscribers,&li);
4007 while((ln = listNext(&li))) {
4008 RedisModuleKeyspaceSubscriber *sub = ln->value;
4009 if (sub->module == module) {
4010 listDelNode(moduleKeyspaceSubscribers, ln);
4011 zfree(sub);
4012 }
4013 }
4014 }
4015
4016 /* --------------------------------------------------------------------------
4017 * Modules Cluster API
4018 * -------------------------------------------------------------------------- */
4019
4020 /* The Cluster message callback function pointer type. */
4021 typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
4022
4023 /* This structure identifies a registered caller: it must match a given module
4024 * ID, for a given message type. The callback function is just the function
4025 * that was registered as receiver. */
4026 typedef struct moduleClusterReceiver {
4027 uint64_t module_id;
4028 RedisModuleClusterMessageReceiver callback;
4029 struct RedisModule *module;
4030 struct moduleClusterReceiver *next;
4031 } moduleClusterReceiver;
4032
4033 typedef struct moduleClusterNodeInfo {
4034 int flags;
4035 char ip[NET_IP_STR_LEN];
4036 int port;
4037 char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
4038 } mdouleClusterNodeInfo;
4039
4040 /* We have an array of message types: each bucket is a linked list of
4041 * configured receivers. */
4042 static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
4043
4044 /* Dispatch the message to the right module receiver. */
moduleCallClusterReceivers(const char * sender_id,uint64_t module_id,uint8_t type,const unsigned char * payload,uint32_t len)4045 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
4046 moduleClusterReceiver *r = clusterReceivers[type];
4047 while(r) {
4048 if (r->module_id == module_id) {
4049 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4050 ctx.module = r->module;
4051 ctx.client = moduleFreeContextReusedClient;
4052 selectDb(ctx.client, 0);
4053 r->callback(&ctx,sender_id,type,payload,len);
4054 moduleFreeContext(&ctx);
4055 return;
4056 }
4057 r = r->next;
4058 }
4059 }
4060
4061 /* Register a callback receiver for cluster messages of type 'type'. If there
4062 * was already a registered callback, this will replace the callback function
4063 * with the one provided, otherwise if the callback is set to NULL and there
4064 * is already a callback for this function, the callback is unregistered
4065 * (so this API call is also used in order to delete the receiver). */
RM_RegisterClusterMessageReceiver(RedisModuleCtx * ctx,uint8_t type,RedisModuleClusterMessageReceiver callback)4066 void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) {
4067 if (!server.cluster_enabled) return;
4068
4069 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4070 moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL;
4071 while(r) {
4072 if (r->module_id == module_id) {
4073 /* Found! Set or delete. */
4074 if (callback) {
4075 r->callback = callback;
4076 } else {
4077 /* Delete the receiver entry if the user is setting
4078 * it to NULL. Just unlink the receiver node from the
4079 * linked list. */
4080 if (prev)
4081 prev->next = r->next;
4082 else
4083 clusterReceivers[type]->next = r->next;
4084 zfree(r);
4085 }
4086 return;
4087 }
4088 prev = r;
4089 r = r->next;
4090 }
4091
4092 /* Not found, let's add it. */
4093 if (callback) {
4094 r = zmalloc(sizeof(*r));
4095 r->module_id = module_id;
4096 r->module = ctx->module;
4097 r->callback = callback;
4098 r->next = clusterReceivers[type];
4099 clusterReceivers[type] = r;
4100 }
4101 }
4102
4103 /* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
4104 * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as
4105 * returned by the receiver callback or by the nodes iteration functions.
4106 *
4107 * The function returns REDISMODULE_OK if the message was successfully sent,
4108 * otherwise if the node is not connected or such node ID does not map to any
4109 * known cluster node, REDISMODULE_ERR is returned. */
RM_SendClusterMessage(RedisModuleCtx * ctx,char * target_id,uint8_t type,unsigned char * msg,uint32_t len)4110 int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) {
4111 if (!server.cluster_enabled) return REDISMODULE_ERR;
4112 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4113 if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK)
4114 return REDISMODULE_OK;
4115 else
4116 return REDISMODULE_ERR;
4117 }
4118
4119 /* Return an array of string pointers, each string pointer points to a cluster
4120 * node ID of exactly REDISMODULE_NODE_ID_SIZE bytes (without any null term).
4121 * The number of returned node IDs is stored into `*numnodes`.
4122 * However if this function is called by a module not running an a Redis
4123 * instance with Redis Cluster enabled, NULL is returned instead.
4124 *
4125 * The IDs returned can be used with RedisModule_GetClusterNodeInfo() in order
4126 * to get more information about single nodes.
4127 *
4128 * The array returned by this function must be freed using the function
4129 * RedisModule_FreeClusterNodesList().
4130 *
4131 * Example:
4132 *
4133 * size_t count, j;
4134 * char **ids = RedisModule_GetClusterNodesList(ctx,&count);
4135 * for (j = 0; j < count; j++) {
4136 * RedisModule_Log("notice","Node %.*s",
4137 * REDISMODULE_NODE_ID_LEN,ids[j]);
4138 * }
4139 * RedisModule_FreeClusterNodesList(ids);
4140 */
RM_GetClusterNodesList(RedisModuleCtx * ctx,size_t * numnodes)4141 char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
4142 UNUSED(ctx);
4143
4144 if (!server.cluster_enabled) return NULL;
4145 size_t count = dictSize(server.cluster->nodes);
4146 char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
4147 dictIterator *di = dictGetIterator(server.cluster->nodes);
4148 dictEntry *de;
4149 int j = 0;
4150 while((de = dictNext(di)) != NULL) {
4151 clusterNode *node = dictGetVal(de);
4152 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
4153 ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
4154 memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
4155 j++;
4156 }
4157 *numnodes = j;
4158 ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
4159 * to also get the count argument. */
4160 dictReleaseIterator(di);
4161 return ids;
4162 }
4163
4164 /* Free the node list obtained with RedisModule_GetClusterNodesList. */
RM_FreeClusterNodesList(char ** ids)4165 void RM_FreeClusterNodesList(char **ids) {
4166 if (ids == NULL) return;
4167 for (int j = 0; ids[j]; j++) zfree(ids[j]);
4168 zfree(ids);
4169 }
4170
4171 /* Return this node ID (REDISMODULE_CLUSTER_ID_LEN bytes) or NULL if the cluster
4172 * is disabled. */
RM_GetMyClusterID(void)4173 const char *RM_GetMyClusterID(void) {
4174 if (!server.cluster_enabled) return NULL;
4175 return server.cluster->myself->name;
4176 }
4177
4178 /* Return the number of nodes in the cluster, regardless of their state
4179 * (handshake, noaddress, ...) so that the number of active nodes may actually
4180 * be smaller, but not greater than this number. If the instance is not in
4181 * cluster mode, zero is returned. */
RM_GetClusterSize(void)4182 size_t RM_GetClusterSize(void) {
4183 if (!server.cluster_enabled) return 0;
4184 return dictSize(server.cluster->nodes);
4185 }
4186
4187 /* Populate the specified info for the node having as ID the specified 'id',
4188 * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from
4189 * the POV of this local node, REDISMODULE_ERR is returned.
4190 *
4191 * The arguments ip, master_id, port and flags can be NULL in case we don't
4192 * need to populate back certain info. If an ip and master_id (only populated
4193 * if the instance is a slave) are specified, they point to buffers holding
4194 * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
4195 * and master_id are not null terminated.
4196 *
4197 * The list of flags reported is the following:
4198 *
4199 * * REDISMODULE_NODE_MYSELF This node
4200 * * REDISMODULE_NODE_MASTER The node is a master
4201 * * REDISMODULE_NODE_SLAVE The node is a replica
4202 * * REDISMODULE_NODE_PFAIL We see the node as failing
4203 * * REDISMODULE_NODE_FAIL The cluster agrees the node is failing
4204 * * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover
4205 */
4206
4207 clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
4208
RM_GetClusterNodeInfo(RedisModuleCtx * ctx,const char * id,char * ip,char * master_id,int * port,int * flags)4209 int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) {
4210 UNUSED(ctx);
4211
4212 clusterNode *node = clusterLookupNode(id);
4213 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
4214 return REDISMODULE_ERR;
4215
4216 if (ip) memcpy(ip,node->name,REDISMODULE_NODE_ID_LEN);
4217
4218 if (master_id) {
4219 /* If the information is not available, the function will set the
4220 * field to zero bytes, so that when the field can't be populated the
4221 * function kinda remains predictable. */
4222 if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
4223 memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
4224 else
4225 memset(master_id,0,REDISMODULE_NODE_ID_LEN);
4226 }
4227 if (port) *port = node->port;
4228
4229 /* As usually we have to remap flags for modules, in order to ensure
4230 * we can provide binary compatibility. */
4231 if (flags) {
4232 *flags = 0;
4233 if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
4234 if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
4235 if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
4236 if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
4237 if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
4238 if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
4239 }
4240 return REDISMODULE_OK;
4241 }
4242
4243 /* Set Redis Cluster flags in order to change the normal behavior of
4244 * Redis Cluster, especially with the goal of disabling certain functions.
4245 * This is useful for modules that use the Cluster API in order to create
4246 * a different distributed system, but still want to use the Redis Cluster
4247 * message bus. Flags that can be set:
4248 *
4249 * CLUSTER_MODULE_FLAG_NO_FAILOVER
4250 * CLUSTER_MODULE_FLAG_NO_REDIRECTION
4251 *
4252 * With the following effects:
4253 *
4254 * NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master.
4255 * Also disables the replica migration feature.
4256 *
4257 * NO_REDIRECTION: Every node will accept any key, without trying to perform
4258 * partitioning according to the user Redis Cluster algorithm.
4259 * Slots informations will still be propagated across the
4260 * cluster, but without effects. */
RM_SetClusterFlags(RedisModuleCtx * ctx,uint64_t flags)4261 void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
4262 UNUSED(ctx);
4263 if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER)
4264 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER;
4265 if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION)
4266 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION;
4267 }
4268
4269 /* --------------------------------------------------------------------------
4270 * Modules Timers API
4271 *
4272 * Module timers are an high precision "green timers" abstraction where
4273 * every module can register even millions of timers without problems, even if
4274 * the actual event loop will just have a single timer that is used to awake the
4275 * module timers subsystem in order to process the next event.
4276 *
4277 * All the timers are stored into a radix tree, ordered by expire time, when
4278 * the main Redis event loop timer callback is called, we try to process all
4279 * the timers already expired one after the other. Then we re-enter the event
4280 * loop registering a timer that will expire when the next to process module
4281 * timer will expire.
4282 *
4283 * Every time the list of active timers drops to zero, we unregister the
4284 * main event loop timer, so that there is no overhead when such feature is
4285 * not used.
4286 * -------------------------------------------------------------------------- */
4287
4288 static rax *Timers; /* The radix tree of all the timers sorted by expire. */
4289 long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */
4290
4291 typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
4292
4293 /* The timer descriptor, stored as value in the radix tree. */
4294 typedef struct RedisModuleTimer {
4295 RedisModule *module; /* Module reference. */
4296 RedisModuleTimerProc callback; /* The callback to invoke on expire. */
4297 void *data; /* Private data for the callback. */
4298 int dbid; /* Database number selected by the original client. */
4299 } RedisModuleTimer;
4300
4301 /* This is the timer handler that is called by the main event loop. We schedule
4302 * this timer to be called when the nearest of our module timers will expire. */
moduleTimerHandler(struct aeEventLoop * eventLoop,long long id,void * clientData)4303 int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *clientData) {
4304 UNUSED(eventLoop);
4305 UNUSED(id);
4306 UNUSED(clientData);
4307
4308 /* To start let's try to fire all the timers already expired. */
4309 raxIterator ri;
4310 raxStart(&ri,Timers);
4311 uint64_t now = ustime();
4312 long long next_period = 0;
4313 while(1) {
4314 raxSeek(&ri,"^",NULL,0);
4315 if (!raxNext(&ri)) break;
4316 uint64_t expiretime;
4317 memcpy(&expiretime,ri.key,sizeof(expiretime));
4318 expiretime = ntohu64(expiretime);
4319 if (now >= expiretime) {
4320 RedisModuleTimer *timer = ri.data;
4321 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4322
4323 ctx.module = timer->module;
4324 ctx.client = moduleFreeContextReusedClient;
4325 selectDb(ctx.client, timer->dbid);
4326 timer->callback(&ctx,timer->data);
4327 moduleFreeContext(&ctx);
4328 raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL);
4329 zfree(timer);
4330 } else {
4331 next_period = (expiretime-now)/1000; /* Scale to milliseconds. */
4332 break;
4333 }
4334 }
4335 raxStop(&ri);
4336
4337 /* Reschedule the next timer or cancel it. */
4338 if (next_period <= 0) next_period = 1;
4339 return (raxSize(Timers) > 0) ? next_period : AE_NOMORE;
4340 }
4341
4342 /* Create a new timer that will fire after `period` milliseconds, and will call
4343 * the specified function using `data` as argument. The returned timer ID can be
4344 * used to get information from the timer or to stop it before it fires. */
RM_CreateTimer(RedisModuleCtx * ctx,mstime_t period,RedisModuleTimerProc callback,void * data)4345 RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
4346 RedisModuleTimer *timer = zmalloc(sizeof(*timer));
4347 timer->module = ctx->module;
4348 timer->callback = callback;
4349 timer->data = data;
4350 timer->dbid = ctx->client->db->id;
4351 uint64_t expiretime = ustime()+period*1000;
4352 uint64_t key;
4353
4354 while(1) {
4355 key = htonu64(expiretime);
4356 if (raxFind(Timers, (unsigned char*)&key,sizeof(key)) == raxNotFound) {
4357 raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL);
4358 break;
4359 } else {
4360 expiretime++;
4361 }
4362 }
4363
4364 /* We need to install the main event loop timer if it's not already
4365 * installed, or we may need to refresh its period if we just installed
4366 * a timer that will expire sooner than any other else. */
4367 if (aeTimer != -1) {
4368 raxIterator ri;
4369 raxStart(&ri,Timers);
4370 raxSeek(&ri,"^",NULL,0);
4371 raxNext(&ri);
4372 if (memcmp(ri.key,&key,sizeof(key)) == 0) {
4373 /* This is the first key, we need to re-install the timer according
4374 * to the just added event. */
4375 aeDeleteTimeEvent(server.el,aeTimer);
4376 aeTimer = -1;
4377 }
4378 raxStop(&ri);
4379 }
4380
4381 /* If we have no main timer (the old one was invalidated, or this is the
4382 * first module timer we have), install one. */
4383 if (aeTimer == -1)
4384 aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL);
4385
4386 return key;
4387 }
4388
4389 /* Stop a timer, returns REDISMODULE_OK if the timer was found, belonged to the
4390 * calling module, and was stopped, otherwise REDISMODULE_ERR is returned.
4391 * If not NULL, the data pointer is set to the value of the data argument when
4392 * the timer was created. */
RM_StopTimer(RedisModuleCtx * ctx,RedisModuleTimerID id,void ** data)4393 int RM_StopTimer(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data) {
4394 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4395 if (timer == raxNotFound || timer->module != ctx->module)
4396 return REDISMODULE_ERR;
4397 if (data) *data = timer->data;
4398 raxRemove(Timers,(unsigned char*)&id,sizeof(id),NULL);
4399 zfree(timer);
4400 return REDISMODULE_OK;
4401 }
4402
4403 /* Obtain information about a timer: its remaining time before firing
4404 * (in milliseconds), and the private data pointer associated with the timer.
4405 * If the timer specified does not exist or belongs to a different module
4406 * no information is returned and the function returns REDISMODULE_ERR, otherwise
4407 * REDISMODULE_OK is returned. The arguments remaining or data can be NULL if
4408 * the caller does not need certain information. */
RM_GetTimerInfo(RedisModuleCtx * ctx,RedisModuleTimerID id,uint64_t * remaining,void ** data)4409 int RM_GetTimerInfo(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data) {
4410 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4411 if (timer == raxNotFound || timer->module != ctx->module)
4412 return REDISMODULE_ERR;
4413 if (remaining) {
4414 int64_t rem = ntohu64(id)-ustime();
4415 if (rem < 0) rem = 0;
4416 *remaining = rem/1000; /* Scale to milliseconds. */
4417 }
4418 if (data) *data = timer->data;
4419 return REDISMODULE_OK;
4420 }
4421
4422 /* --------------------------------------------------------------------------
4423 * Modules Dictionary API
4424 *
4425 * Implements a sorted dictionary (actually backed by a radix tree) with
4426 * the usual get / set / del / num-items API, together with an iterator
4427 * capable of going back and forth.
4428 * -------------------------------------------------------------------------- */
4429
4430 /* Create a new dictionary. The 'ctx' pointer can be the current module context
4431 * or NULL, depending on what you want. Please follow the following rules:
4432 *
4433 * 1. Use a NULL context if you plan to retain a reference to this dictionary
4434 * that will survive the time of the module callback where you created it.
4435 * 2. Use a NULL context if no context is available at the time you are creating
4436 * the dictionary (of course...).
4437 * 3. However use the current callback context as 'ctx' argument if the
4438 * dictionary time to live is just limited to the callback scope. In this
4439 * case, if enabled, you can enjoy the automatic memory management that will
4440 * reclaim the dictionary memory, as well as the strings returned by the
4441 * Next / Prev dictionary iterator calls.
4442 */
RM_CreateDict(RedisModuleCtx * ctx)4443 RedisModuleDict *RM_CreateDict(RedisModuleCtx *ctx) {
4444 struct RedisModuleDict *d = zmalloc(sizeof(*d));
4445 d->rax = raxNew();
4446 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_DICT,d);
4447 return d;
4448 }
4449
4450 /* Free a dictionary created with RM_CreateDict(). You need to pass the
4451 * context pointer 'ctx' only if the dictionary was created using the
4452 * context instead of passing NULL. */
RM_FreeDict(RedisModuleCtx * ctx,RedisModuleDict * d)4453 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d) {
4454 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_DICT,d);
4455 raxFree(d->rax);
4456 zfree(d);
4457 }
4458
4459 /* Return the size of the dictionary (number of keys). */
RM_DictSize(RedisModuleDict * d)4460 uint64_t RM_DictSize(RedisModuleDict *d) {
4461 return raxSize(d->rax);
4462 }
4463
4464 /* Store the specified key into the dictionary, setting its value to the
4465 * pointer 'ptr'. If the key was added with success, since it did not
4466 * already exist, REDISMODULE_OK is returned. Otherwise if the key already
4467 * exists the function returns REDISMODULE_ERR. */
RM_DictSetC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4468 int RM_DictSetC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4469 int retval = raxTryInsert(d->rax,key,keylen,ptr,NULL);
4470 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4471 }
4472
4473 /* Like RedisModule_DictSetC() but will replace the key with the new
4474 * value if the key already exists. */
RM_DictReplaceC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4475 int RM_DictReplaceC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4476 int retval = raxInsert(d->rax,key,keylen,ptr,NULL);
4477 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4478 }
4479
4480 /* Like RedisModule_DictSetC() but takes the key as a RedisModuleString. */
RM_DictSet(RedisModuleDict * d,RedisModuleString * key,void * ptr)4481 int RM_DictSet(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4482 return RM_DictSetC(d,key->ptr,sdslen(key->ptr),ptr);
4483 }
4484
4485 /* Like RedisModule_DictReplaceC() but takes the key as a RedisModuleString. */
RM_DictReplace(RedisModuleDict * d,RedisModuleString * key,void * ptr)4486 int RM_DictReplace(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4487 return RM_DictReplaceC(d,key->ptr,sdslen(key->ptr),ptr);
4488 }
4489
4490 /* Return the value stored at the specified key. The function returns NULL
4491 * both in the case the key does not exist, or if you actually stored
4492 * NULL at key. So, optionally, if the 'nokey' pointer is not NULL, it will
4493 * be set by reference to 1 if the key does not exist, or to 0 if the key
4494 * exists. */
RM_DictGetC(RedisModuleDict * d,void * key,size_t keylen,int * nokey)4495 void *RM_DictGetC(RedisModuleDict *d, void *key, size_t keylen, int *nokey) {
4496 void *res = raxFind(d->rax,key,keylen);
4497 if (nokey) *nokey = (res == raxNotFound);
4498 return (res == raxNotFound) ? NULL : res;
4499 }
4500
4501 /* Like RedisModule_DictGetC() but takes the key as a RedisModuleString. */
RM_DictGet(RedisModuleDict * d,RedisModuleString * key,int * nokey)4502 void *RM_DictGet(RedisModuleDict *d, RedisModuleString *key, int *nokey) {
4503 return RM_DictGetC(d,key->ptr,sdslen(key->ptr),nokey);
4504 }
4505
4506 /* Remove the specified key from the dictionary, returning REDISMODULE_OK if
4507 * the key was found and delted, or REDISMODULE_ERR if instead there was
4508 * no such key in the dictionary. When the operation is successful, if
4509 * 'oldval' is not NULL, then '*oldval' is set to the value stored at the
4510 * key before it was deleted. Using this feature it is possible to get
4511 * a pointer to the value (for instance in order to release it), without
4512 * having to call RedisModule_DictGet() before deleting the key. */
RM_DictDelC(RedisModuleDict * d,void * key,size_t keylen,void * oldval)4513 int RM_DictDelC(RedisModuleDict *d, void *key, size_t keylen, void *oldval) {
4514 int retval = raxRemove(d->rax,key,keylen,oldval);
4515 return retval ? REDISMODULE_OK : REDISMODULE_ERR;
4516 }
4517
4518 /* Like RedisModule_DictDelC() but gets the key as a RedisModuleString. */
RM_DictDel(RedisModuleDict * d,RedisModuleString * key,void * oldval)4519 int RM_DictDel(RedisModuleDict *d, RedisModuleString *key, void *oldval) {
4520 return RM_DictDelC(d,key->ptr,sdslen(key->ptr),oldval);
4521 }
4522
4523 /* Return an interator, setup in order to start iterating from the specified
4524 * key by applying the operator 'op', which is just a string specifying the
4525 * comparison operator to use in order to seek the first element. The
4526 * operators avalable are:
4527 *
4528 * "^" -- Seek the first (lexicographically smaller) key.
4529 * "$" -- Seek the last (lexicographically biffer) key.
4530 * ">" -- Seek the first element greter than the specified key.
4531 * ">=" -- Seek the first element greater or equal than the specified key.
4532 * "<" -- Seek the first element smaller than the specified key.
4533 * "<=" -- Seek the first element smaller or equal than the specified key.
4534 * "==" -- Seek the first element matching exactly the specified key.
4535 *
4536 * Note that for "^" and "$" the passed key is not used, and the user may
4537 * just pass NULL with a length of 0.
4538 *
4539 * If the element to start the iteration cannot be seeked based on the
4540 * key and operator passed, RedisModule_DictNext() / Prev() will just return
4541 * REDISMODULE_ERR at the first call, otherwise they'll produce elements.
4542 */
RM_DictIteratorStartC(RedisModuleDict * d,const char * op,void * key,size_t keylen)4543 RedisModuleDictIter *RM_DictIteratorStartC(RedisModuleDict *d, const char *op, void *key, size_t keylen) {
4544 RedisModuleDictIter *di = zmalloc(sizeof(*di));
4545 di->dict = d;
4546 raxStart(&di->ri,d->rax);
4547 raxSeek(&di->ri,op,key,keylen);
4548 return di;
4549 }
4550
4551 /* Exactly like RedisModule_DictIteratorStartC, but the key is passed as a
4552 * RedisModuleString. */
RM_DictIteratorStart(RedisModuleDict * d,const char * op,RedisModuleString * key)4553 RedisModuleDictIter *RM_DictIteratorStart(RedisModuleDict *d, const char *op, RedisModuleString *key) {
4554 return RM_DictIteratorStartC(d,op,key->ptr,sdslen(key->ptr));
4555 }
4556
4557 /* Release the iterator created with RedisModule_DictIteratorStart(). This call
4558 * is mandatory otherwise a memory leak is introduced in the module. */
RM_DictIteratorStop(RedisModuleDictIter * di)4559 void RM_DictIteratorStop(RedisModuleDictIter *di) {
4560 raxStop(&di->ri);
4561 zfree(di);
4562 }
4563
4564 /* After its creation with RedisModule_DictIteratorStart(), it is possible to
4565 * change the currently selected element of the iterator by using this
4566 * API call. The result based on the operator and key is exactly like
4567 * the function RedisModule_DictIteratorStart(), however in this case the
4568 * return value is just REDISMODULE_OK in case the seeked element was found,
4569 * or REDISMODULE_ERR in case it was not possible to seek the specified
4570 * element. It is possible to reseek an iterator as many times as you want. */
RM_DictIteratorReseekC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4571 int RM_DictIteratorReseekC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4572 return raxSeek(&di->ri,op,key,keylen);
4573 }
4574
4575 /* Like RedisModule_DictIteratorReseekC() but takes the key as as a
4576 * RedisModuleString. */
RM_DictIteratorReseek(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4577 int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4578 return RM_DictIteratorReseekC(di,op,key->ptr,sdslen(key->ptr));
4579 }
4580
4581 /* Return the current item of the dictionary iterator 'di' and steps to the
4582 * next element. If the iterator already yield the last element and there
4583 * are no other elements to return, NULL is returned, otherwise a pointer
4584 * to a string representing the key is provided, and the '*keylen' length
4585 * is set by reference (if keylen is not NULL). The '*dataptr', if not NULL
4586 * is set to the value of the pointer stored at the returned key as auxiliary
4587 * data (as set by the RedisModule_DictSet API).
4588 *
4589 * Usage example:
4590 *
4591 * ... create the iterator here ...
4592 * char *key;
4593 * void *data;
4594 * while((key = RedisModule_DictNextC(iter,&keylen,&data)) != NULL) {
4595 * printf("%.*s %p\n", (int)keylen, key, data);
4596 * }
4597 *
4598 * The returned pointer is of type void because sometimes it makes sense
4599 * to cast it to a char* sometimes to an unsigned char* depending on the
4600 * fact it contains or not binary data, so this API ends being more
4601 * comfortable to use.
4602 *
4603 * The validity of the returned pointer is until the next call to the
4604 * next/prev iterator step. Also the pointer is no longer valid once the
4605 * iterator is released. */
RM_DictNextC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4606 void *RM_DictNextC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4607 if (!raxNext(&di->ri)) return NULL;
4608 if (keylen) *keylen = di->ri.key_len;
4609 if (dataptr) *dataptr = di->ri.data;
4610 return di->ri.key;
4611 }
4612
4613 /* This function is exactly like RedisModule_DictNext() but after returning
4614 * the currently selected element in the iterator, it selects the previous
4615 * element (laxicographically smaller) instead of the next one. */
RM_DictPrevC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4616 void *RM_DictPrevC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4617 if (!raxPrev(&di->ri)) return NULL;
4618 if (keylen) *keylen = di->ri.key_len;
4619 if (dataptr) *dataptr = di->ri.data;
4620 return di->ri.key;
4621 }
4622
4623 /* Like RedisModuleNextC(), but instead of returning an internally allocated
4624 * buffer and key length, it returns directly a module string object allocated
4625 * in the specified context 'ctx' (that may be NULL exactly like for the main
4626 * API RedisModule_CreateString).
4627 *
4628 * The returned string object should be deallocated after use, either manually
4629 * or by using a context that has automatic memory management active. */
RM_DictNext(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4630 RedisModuleString *RM_DictNext(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4631 size_t keylen;
4632 void *key = RM_DictNextC(di,&keylen,dataptr);
4633 if (key == NULL) return NULL;
4634 return RM_CreateString(ctx,key,keylen);
4635 }
4636
4637 /* Like RedisModule_DictNext() but after returning the currently selected
4638 * element in the iterator, it selects the previous element (laxicographically
4639 * smaller) instead of the next one. */
RM_DictPrev(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4640 RedisModuleString *RM_DictPrev(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4641 size_t keylen;
4642 void *key = RM_DictPrevC(di,&keylen,dataptr);
4643 if (key == NULL) return NULL;
4644 return RM_CreateString(ctx,key,keylen);
4645 }
4646
4647 /* Compare the element currently pointed by the iterator to the specified
4648 * element given by key/keylen, according to the operator 'op' (the set of
4649 * valid operators are the same valid for RedisModule_DictIteratorStart).
4650 * If the comparision is successful the command returns REDISMODULE_OK
4651 * otherwise REDISMODULE_ERR is returned.
4652 *
4653 * This is useful when we want to just emit a lexicographical range, so
4654 * in the loop, as we iterate elements, we can also check if we are still
4655 * on range.
4656 *
4657 * The function returne REDISMODULE_ERR if the iterator reached the
4658 * end of elements condition as well. */
RM_DictCompareC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4659 int RM_DictCompareC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4660 if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4661 int res = raxCompare(&di->ri,op,key,keylen);
4662 return res ? REDISMODULE_OK : REDISMODULE_ERR;
4663 }
4664
4665 /* Like RedisModule_DictCompareC but gets the key to compare with the current
4666 * iterator key as a RedisModuleString. */
RM_DictCompare(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4667 int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4668 if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4669 int res = raxCompare(&di->ri,op,key->ptr,sdslen(key->ptr));
4670 return res ? REDISMODULE_OK : REDISMODULE_ERR;
4671 }
4672
4673 /* --------------------------------------------------------------------------
4674 * Modules utility APIs
4675 * -------------------------------------------------------------------------- */
4676
4677 /* Return random bytes using SHA1 in counter mode with a /dev/urandom
4678 * initialized seed. This function is fast so can be used to generate
4679 * many bytes without any effect on the operating system entropy pool.
4680 * Currently this function is not thread safe. */
RM_GetRandomBytes(unsigned char * dst,size_t len)4681 void RM_GetRandomBytes(unsigned char *dst, size_t len) {
4682 getRandomBytes(dst,len);
4683 }
4684
4685 /* Like RedisModule_GetRandomBytes() but instead of setting the string to
4686 * random bytes the string is set to random characters in the in the
4687 * hex charset [0-9a-f]. */
RM_GetRandomHexChars(char * dst,size_t len)4688 void RM_GetRandomHexChars(char *dst, size_t len) {
4689 getRandomHexChars(dst,len);
4690 }
4691
4692 /* --------------------------------------------------------------------------
4693 * Modules API exporting / importing
4694 * -------------------------------------------------------------------------- */
4695
4696 /* This function is called by a module in order to export some API with a
4697 * given name. Other modules will be able to use this API by calling the
4698 * symmetrical function RM_GetSharedAPI() and casting the return value to
4699 * the right function pointer.
4700 *
4701 * The function will return REDISMODULE_OK if the name is not already taken,
4702 * otherwise REDISMODULE_ERR will be returned and no operation will be
4703 * performed.
4704 *
4705 * IMPORTANT: the apiname argument should be a string literal with static
4706 * lifetime. The API relies on the fact that it will always be valid in
4707 * the future. */
RM_ExportSharedAPI(RedisModuleCtx * ctx,const char * apiname,void * func)4708 int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) {
4709 RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi));
4710 sapi->module = ctx->module;
4711 sapi->func = func;
4712 if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) {
4713 zfree(sapi);
4714 return REDISMODULE_ERR;
4715 }
4716 return REDISMODULE_OK;
4717 }
4718
4719 /* Request an exported API pointer. The return value is just a void pointer
4720 * that the caller of this function will be required to cast to the right
4721 * function pointer, so this is a private contract between modules.
4722 *
4723 * If the requested API is not available then NULL is returned. Because
4724 * modules can be loaded at different times with different order, this
4725 * function calls should be put inside some module generic API registering
4726 * step, that is called every time a module attempts to execute a
4727 * command that requires external APIs: if some API cannot be resolved, the
4728 * command should return an error.
4729 *
4730 * Here is an exmaple:
4731 *
4732 * int ... myCommandImplementation() {
4733 * if (getExternalAPIs() == 0) {
4734 * reply with an error here if we cannot have the APIs
4735 * }
4736 * // Use the API:
4737 * myFunctionPointer(foo);
4738 * }
4739 *
4740 * And the function registerAPI() is:
4741 *
4742 * int getExternalAPIs(void) {
4743 * static int api_loaded = 0;
4744 * if (api_loaded != 0) return 1; // APIs already resolved.
4745 *
4746 * myFunctionPointer = RedisModule_GetOtherModuleAPI("...");
4747 * if (myFunctionPointer == NULL) return 0;
4748 *
4749 * return 1;
4750 * }
4751 */
RM_GetSharedAPI(RedisModuleCtx * ctx,const char * apiname)4752 void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) {
4753 dictEntry *de = dictFind(server.sharedapi, apiname);
4754 if (de == NULL) return NULL;
4755 RedisModuleSharedAPI *sapi = dictGetVal(de);
4756 if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) {
4757 listAddNodeTail(sapi->module->usedby,ctx->module);
4758 listAddNodeTail(ctx->module->using,sapi->module);
4759 }
4760 return sapi->func;
4761 }
4762
4763 /* Remove all the APIs registered by the specified module. Usually you
4764 * want this when the module is going to be unloaded. This function
4765 * assumes that's caller responsibility to make sure the APIs are not
4766 * used by other modules.
4767 *
4768 * The number of unregistered APIs is returned. */
moduleUnregisterSharedAPI(RedisModule * module)4769 int moduleUnregisterSharedAPI(RedisModule *module) {
4770 int count = 0;
4771 dictIterator *di = dictGetSafeIterator(server.sharedapi);
4772 dictEntry *de;
4773 while ((de = dictNext(di)) != NULL) {
4774 const char *apiname = dictGetKey(de);
4775 RedisModuleSharedAPI *sapi = dictGetVal(de);
4776 if (sapi->module == module) {
4777 dictDelete(server.sharedapi,apiname);
4778 zfree(sapi);
4779 count++;
4780 }
4781 }
4782 dictReleaseIterator(di);
4783 return count;
4784 }
4785
4786 /* Remove the specified module as an user of APIs of ever other module.
4787 * This is usually called when a module is unloaded.
4788 *
4789 * Returns the number of modules this module was using APIs from. */
moduleUnregisterUsedAPI(RedisModule * module)4790 int moduleUnregisterUsedAPI(RedisModule *module) {
4791 listIter li;
4792 listNode *ln;
4793 int count = 0;
4794
4795 listRewind(module->using,&li);
4796 while((ln = listNext(&li))) {
4797 RedisModule *used = ln->value;
4798 listNode *ln = listSearchKey(used->usedby,module);
4799 if (ln) {
4800 listDelNode(module->using,ln);
4801 count++;
4802 }
4803 }
4804 return count;
4805 }
4806
4807 /* Unregister all filters registered by a module.
4808 * This is called when a module is being unloaded.
4809 *
4810 * Returns the number of filters unregistered. */
moduleUnregisterFilters(RedisModule * module)4811 int moduleUnregisterFilters(RedisModule *module) {
4812 listIter li;
4813 listNode *ln;
4814 int count = 0;
4815
4816 listRewind(module->filters,&li);
4817 while((ln = listNext(&li))) {
4818 RedisModuleCommandFilter *filter = ln->value;
4819 listNode *ln = listSearchKey(moduleCommandFilters,filter);
4820 if (ln) {
4821 listDelNode(moduleCommandFilters,ln);
4822 count++;
4823 }
4824 zfree(filter);
4825 }
4826 return count;
4827 }
4828
4829 /* --------------------------------------------------------------------------
4830 * Module Command Filter API
4831 * -------------------------------------------------------------------------- */
4832
4833 /* Register a new command filter function.
4834 *
4835 * Command filtering makes it possible for modules to extend Redis by plugging
4836 * into the execution flow of all commands.
4837 *
4838 * A registered filter gets called before Redis executes *any* command. This
4839 * includes both core Redis commands and commands registered by any module. The
4840 * filter applies in all execution paths including:
4841 *
4842 * 1. Invocation by a client.
4843 * 2. Invocation through `RedisModule_Call()` by any module.
4844 * 3. Invocation through Lua 'redis.call()`.
4845 * 4. Replication of a command from a master.
4846 *
4847 * The filter executes in a special filter context, which is different and more
4848 * limited than a RedisModuleCtx. Because the filter affects any command, it
4849 * must be implemented in a very efficient way to reduce the performance impact
4850 * on Redis. All Redis Module API calls that require a valid context (such as
4851 * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a
4852 * filter context.
4853 *
4854 * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the
4855 * executed command and its arguments. As the filter executes before Redis
4856 * begins processing the command, any change will affect the way the command is
4857 * processed. For example, a module can override Redis commands this way:
4858 *
4859 * 1. Register a `MODULE.SET` command which implements an extended version of
4860 * the Redis `SET` command.
4861 * 2. Register a command filter which detects invocation of `SET` on a specific
4862 * pattern of keys. Once detected, the filter will replace the first
4863 * argument from `SET` to `MODULE.SET`.
4864 * 3. When filter execution is complete, Redis considers the new command name
4865 * and therefore executes the module's own command.
4866 *
4867 * Note that in the above use case, if `MODULE.SET` itself uses
4868 * `RedisModule_Call()` the filter will be applied on that call as well. If
4869 * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when
4870 * registering the filter.
4871 *
4872 * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that
4873 * originate from the module's own `RM_Call()` from reaching the filter. This
4874 * flag is effective for all execution flows, including nested ones, as long as
4875 * the execution begins from the module's command context or a thread-safe
4876 * context that is associated with a blocking command.
4877 *
4878 * Detached thread-safe contexts are *not* associated with the module and cannot
4879 * be protected by this flag.
4880 *
4881 * If multiple filters are registered (by the same or different modules), they
4882 * are executed in the order of registration.
4883 */
4884
RM_RegisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilterFunc callback,int flags)4885 RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) {
4886 RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter));
4887 filter->module = ctx->module;
4888 filter->callback = callback;
4889 filter->flags = flags;
4890
4891 listAddNodeTail(moduleCommandFilters, filter);
4892 listAddNodeTail(ctx->module->filters, filter);
4893 return filter;
4894 }
4895
4896 /* Unregister a command filter.
4897 */
RM_UnregisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilter * filter)4898 int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) {
4899 listNode *ln;
4900
4901 /* A module can only remove its own filters */
4902 if (filter->module != ctx->module) return REDISMODULE_ERR;
4903
4904 ln = listSearchKey(moduleCommandFilters,filter);
4905 if (!ln) return REDISMODULE_ERR;
4906 listDelNode(moduleCommandFilters,ln);
4907
4908 ln = listSearchKey(ctx->module->filters,filter);
4909 if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */
4910 listDelNode(ctx->module->filters,ln);
4911
4912 return REDISMODULE_OK;
4913 }
4914
moduleCallCommandFilters(client * c)4915 void moduleCallCommandFilters(client *c) {
4916 if (listLength(moduleCommandFilters) == 0) return;
4917
4918 listIter li;
4919 listNode *ln;
4920 listRewind(moduleCommandFilters,&li);
4921
4922 RedisModuleCommandFilterCtx filter = {
4923 .argv = c->argv,
4924 .argc = c->argc
4925 };
4926
4927 while((ln = listNext(&li))) {
4928 RedisModuleCommandFilter *f = ln->value;
4929
4930 /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is
4931 * currently processing a command.
4932 */
4933 if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue;
4934
4935 /* Call filter */
4936 f->callback(&filter);
4937 }
4938
4939 c->argv = filter.argv;
4940 c->argc = filter.argc;
4941 }
4942
4943 /* Return the number of arguments a filtered command has. The number of
4944 * arguments include the command itself.
4945 */
RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx * fctx)4946 int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx)
4947 {
4948 return fctx->argc;
4949 }
4950
4951 /* Return the specified command argument. The first argument (position 0) is
4952 * the command itself, and the rest are user-provided args.
4953 */
RM_CommandFilterArgGet(RedisModuleCommandFilterCtx * fctx,int pos)4954 const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos)
4955 {
4956 if (pos < 0 || pos >= fctx->argc) return NULL;
4957 return fctx->argv[pos];
4958 }
4959
4960 /* Modify the filtered command by inserting a new argument at the specified
4961 * position. The specified RedisModuleString argument may be used by Redis
4962 * after the filter context is destroyed, so it must not be auto-memory
4963 * allocated, freed or used elsewhere.
4964 */
4965
RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)4966 int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
4967 {
4968 int i;
4969
4970 if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR;
4971
4972 fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *));
4973 for (i = fctx->argc; i > pos; i--) {
4974 fctx->argv[i] = fctx->argv[i-1];
4975 }
4976 fctx->argv[pos] = arg;
4977 fctx->argc++;
4978
4979 return REDISMODULE_OK;
4980 }
4981
4982 /* Modify the filtered command by replacing an existing argument with a new one.
4983 * The specified RedisModuleString argument may be used by Redis after the
4984 * filter context is destroyed, so it must not be auto-memory allocated, freed
4985 * or used elsewhere.
4986 */
4987
RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)4988 int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
4989 {
4990 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
4991
4992 decrRefCount(fctx->argv[pos]);
4993 fctx->argv[pos] = arg;
4994
4995 return REDISMODULE_OK;
4996 }
4997
4998 /* Modify the filtered command by deleting an argument at the specified
4999 * position.
5000 */
RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx * fctx,int pos)5001 int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
5002 {
5003 int i;
5004 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
5005
5006 decrRefCount(fctx->argv[pos]);
5007 for (i = pos; i < fctx->argc-1; i++) {
5008 fctx->argv[i] = fctx->argv[i+1];
5009 }
5010 fctx->argc--;
5011
5012 return REDISMODULE_OK;
5013 }
5014
5015 /* --------------------------------------------------------------------------
5016 * Modules API internals
5017 * -------------------------------------------------------------------------- */
5018
5019 /* server.moduleapi dictionary type. Only uses plain C strings since
5020 * this gets queries from modules. */
5021
dictCStringKeyHash(const void * key)5022 uint64_t dictCStringKeyHash(const void *key) {
5023 return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
5024 }
5025
dictCStringKeyCompare(void * privdata,const void * key1,const void * key2)5026 int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) {
5027 UNUSED(privdata);
5028 return strcmp(key1,key2) == 0;
5029 }
5030
5031 dictType moduleAPIDictType = {
5032 dictCStringKeyHash, /* hash function */
5033 NULL, /* key dup */
5034 NULL, /* val dup */
5035 dictCStringKeyCompare, /* key compare */
5036 NULL, /* key destructor */
5037 NULL /* val destructor */
5038 };
5039
moduleRegisterApi(const char * funcname,void * funcptr)5040 int moduleRegisterApi(const char *funcname, void *funcptr) {
5041 return dictAdd(server.moduleapi, (char*)funcname, funcptr);
5042 }
5043
5044 #define REGISTER_API(name) \
5045 moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)
5046
5047 /* Global initialization at Redis startup. */
5048 void moduleRegisterCoreAPI(void);
5049
moduleInitModulesSystem(void)5050 void moduleInitModulesSystem(void) {
5051 moduleUnblockedClients = listCreate();
5052 server.loadmodule_queue = listCreate();
5053 modules = dictCreate(&modulesDictType,NULL);
5054
5055 /* Set up the keyspace notification susbscriber list and static client */
5056 moduleKeyspaceSubscribers = listCreate();
5057 moduleFreeContextReusedClient = createClient(-1);
5058 moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
5059
5060 /* Set up filter list */
5061 moduleCommandFilters = listCreate();
5062
5063 moduleRegisterCoreAPI();
5064 if (pipe(server.module_blocked_pipe) == -1) {
5065 serverLog(LL_WARNING,
5066 "Can't create the pipe for module blocking commands: %s",
5067 strerror(errno));
5068 exit(1);
5069 }
5070 /* Make the pipe non blocking. This is just a best effort aware mechanism
5071 * and we do not want to block not in the read nor in the write half. */
5072 anetNonBlock(NULL,server.module_blocked_pipe[0]);
5073 anetNonBlock(NULL,server.module_blocked_pipe[1]);
5074
5075 /* Create the timers radix tree. */
5076 Timers = raxNew();
5077
5078 /* Our thread-safe contexts GIL must start with already locked:
5079 * it is just unlocked when it's safe. */
5080 pthread_mutex_lock(&moduleGIL);
5081 }
5082
5083 /* Load all the modules in the server.loadmodule_queue list, which is
5084 * populated by `loadmodule` directives in the configuration file.
5085 * We can't load modules directly when processing the configuration file
5086 * because the server must be fully initialized before loading modules.
5087 *
5088 * The function aborts the server on errors, since to start with missing
5089 * modules is not considered sane: clients may rely on the existence of
5090 * given commands, loading AOF also may need some modules to exist, and
5091 * if this instance is a slave, it must understand commands from master. */
moduleLoadFromQueue(void)5092 void moduleLoadFromQueue(void) {
5093 listIter li;
5094 listNode *ln;
5095
5096 listRewind(server.loadmodule_queue,&li);
5097 while((ln = listNext(&li))) {
5098 struct moduleLoadQueueEntry *loadmod = ln->value;
5099 if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
5100 == C_ERR)
5101 {
5102 serverLog(LL_WARNING,
5103 "Can't load module from %s: server aborting",
5104 loadmod->path);
5105 exit(1);
5106 }
5107 }
5108 }
5109
moduleFreeModuleStructure(struct RedisModule * module)5110 void moduleFreeModuleStructure(struct RedisModule *module) {
5111 listRelease(module->types);
5112 listRelease(module->filters);
5113 sdsfree(module->name);
5114 zfree(module);
5115 }
5116
moduleUnregisterCommands(struct RedisModule * module)5117 void moduleUnregisterCommands(struct RedisModule *module) {
5118 /* Unregister all the commands registered by this module. */
5119 dictIterator *di = dictGetSafeIterator(server.commands);
5120 dictEntry *de;
5121 while ((de = dictNext(di)) != NULL) {
5122 struct redisCommand *cmd = dictGetVal(de);
5123 if (cmd->proc == RedisModuleCommandDispatcher) {
5124 RedisModuleCommandProxy *cp =
5125 (void*)(unsigned long)cmd->getkeys_proc;
5126 sds cmdname = cp->rediscmd->name;
5127 if (cp->module == module) {
5128 dictDelete(server.commands,cmdname);
5129 dictDelete(server.orig_commands,cmdname);
5130 sdsfree(cmdname);
5131 zfree(cp->rediscmd);
5132 zfree(cp);
5133 }
5134 }
5135 }
5136 dictReleaseIterator(di);
5137 }
5138
5139 /* Load a module and initialize it. On success C_OK is returned, otherwise
5140 * C_ERR is returned. */
moduleLoad(const char * path,void ** module_argv,int module_argc)5141 int moduleLoad(const char *path, void **module_argv, int module_argc) {
5142 int (*onload)(void *, void **, int);
5143 void *handle;
5144 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
5145
5146 handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
5147 if (handle == NULL) {
5148 serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
5149 return C_ERR;
5150 }
5151 onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
5152 if (onload == NULL) {
5153 dlclose(handle);
5154 serverLog(LL_WARNING,
5155 "Module %s does not export RedisModule_OnLoad() "
5156 "symbol. Module not loaded.",path);
5157 return C_ERR;
5158 }
5159 if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
5160 if (ctx.module) {
5161 moduleUnregisterCommands(ctx.module);
5162 moduleUnregisterSharedAPI(ctx.module);
5163 moduleUnregisterUsedAPI(ctx.module);
5164 moduleFreeModuleStructure(ctx.module);
5165 }
5166 dlclose(handle);
5167 serverLog(LL_WARNING,
5168 "Module %s initialization failed. Module not loaded",path);
5169 return C_ERR;
5170 }
5171
5172 /* Redis module loaded! Register it. */
5173 dictAdd(modules,ctx.module->name,ctx.module);
5174 ctx.module->handle = handle;
5175 serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
5176 moduleFreeContext(&ctx);
5177 return C_OK;
5178 }
5179
5180
5181 /* Unload the module registered with the specified name. On success
5182 * C_OK is returned, otherwise C_ERR is returned and errno is set
5183 * to the following values depending on the type of error:
5184 *
5185 * * ENONET: No such module having the specified name.
5186 * * EBUSY: The module exports a new data type and can only be reloaded. */
moduleUnload(sds name)5187 int moduleUnload(sds name) {
5188 struct RedisModule *module = dictFetchValue(modules,name);
5189
5190 if (module == NULL) {
5191 errno = ENOENT;
5192 return REDISMODULE_ERR;
5193 } else if (listLength(module->types)) {
5194 errno = EBUSY;
5195 return REDISMODULE_ERR;
5196 } else if (listLength(module->usedby)) {
5197 errno = EPERM;
5198 return REDISMODULE_ERR;
5199 }
5200
5201 moduleUnregisterCommands(module);
5202 moduleUnregisterSharedAPI(module);
5203 moduleUnregisterUsedAPI(module);
5204 moduleUnregisterFilters(module);
5205
5206 /* Remove any notification subscribers this module might have */
5207 moduleUnsubscribeNotifications(module);
5208
5209 /* Unregister all the hooks. TODO: Yet no hooks support here. */
5210
5211 /* Unload the dynamic library. */
5212 if (dlclose(module->handle) == -1) {
5213 char *error = dlerror();
5214 if (error == NULL) error = "Unknown error";
5215 serverLog(LL_WARNING,"Error when trying to close the %s module: %s",
5216 module->name, error);
5217 }
5218
5219 /* Remove from list of modules. */
5220 serverLog(LL_NOTICE,"Module %s unloaded",module->name);
5221 dictDelete(modules,module->name);
5222 module->name = NULL; /* The name was already freed by dictDelete(). */
5223 moduleFreeModuleStructure(module);
5224
5225 return REDISMODULE_OK;
5226 }
5227
5228 /* Redis MODULE command.
5229 *
5230 * MODULE LOAD <path> [args...] */
moduleCommand(client * c)5231 void moduleCommand(client *c) {
5232 char *subcmd = c->argv[1]->ptr;
5233 if (c->argc == 2 && !strcasecmp(subcmd,"help")) {
5234 const char *help[] = {
5235 "LIST -- Return a list of loaded modules.",
5236 "LOAD <path> [arg ...] -- Load a module library from <path>.",
5237 "UNLOAD <name> -- Unload a module.",
5238 NULL
5239 };
5240 addReplyHelp(c, help);
5241 } else
5242 if (!strcasecmp(subcmd,"load") && c->argc >= 3) {
5243 robj **argv = NULL;
5244 int argc = 0;
5245
5246 if (c->argc > 3) {
5247 argc = c->argc - 3;
5248 argv = &c->argv[3];
5249 }
5250
5251 if (moduleLoad(c->argv[2]->ptr,(void **)argv,argc) == C_OK)
5252 addReply(c,shared.ok);
5253 else
5254 addReplyError(c,
5255 "Error loading the extension. Please check the server logs.");
5256 } else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
5257 if (moduleUnload(c->argv[2]->ptr) == C_OK)
5258 addReply(c,shared.ok);
5259 else {
5260 char *errmsg;
5261 switch(errno) {
5262 case ENOENT:
5263 errmsg = "no such module with that name";
5264 break;
5265 case EBUSY:
5266 errmsg = "the module exports one or more module-side data "
5267 "types, can't unload";
5268 break;
5269 case EPERM:
5270 errmsg = "the module exports APIs used by other modules. "
5271 "Please unload them first and try again";
5272 break;
5273 default:
5274 errmsg = "operation not possible.";
5275 break;
5276 }
5277 addReplyErrorFormat(c,"Error unloading module: %s",errmsg);
5278 }
5279 } else if (!strcasecmp(subcmd,"list") && c->argc == 2) {
5280 dictIterator *di = dictGetIterator(modules);
5281 dictEntry *de;
5282
5283 addReplyMultiBulkLen(c,dictSize(modules));
5284 while ((de = dictNext(di)) != NULL) {
5285 sds name = dictGetKey(de);
5286 struct RedisModule *module = dictGetVal(de);
5287 addReplyMultiBulkLen(c,4);
5288 addReplyBulkCString(c,"name");
5289 addReplyBulkCBuffer(c,name,sdslen(name));
5290 addReplyBulkCString(c,"ver");
5291 addReplyLongLong(c,module->ver);
5292 }
5293 dictReleaseIterator(di);
5294 } else {
5295 addReplySubcommandSyntaxError(c);
5296 return;
5297 }
5298 }
5299
5300 /* Return the number of registered modules. */
moduleCount(void)5301 size_t moduleCount(void) {
5302 return dictSize(modules);
5303 }
5304
5305 /* Register all the APIs we export. Keep this function at the end of the
5306 * file so that's easy to seek it to add new entries. */
moduleRegisterCoreAPI(void)5307 void moduleRegisterCoreAPI(void) {
5308 server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
5309 server.sharedapi = dictCreate(&moduleAPIDictType,NULL);
5310 REGISTER_API(Alloc);
5311 REGISTER_API(Calloc);
5312 REGISTER_API(Realloc);
5313 REGISTER_API(Free);
5314 REGISTER_API(Strdup);
5315 REGISTER_API(CreateCommand);
5316 REGISTER_API(SetModuleAttribs);
5317 REGISTER_API(IsModuleNameBusy);
5318 REGISTER_API(WrongArity);
5319 REGISTER_API(ReplyWithLongLong);
5320 REGISTER_API(ReplyWithError);
5321 REGISTER_API(ReplyWithSimpleString);
5322 REGISTER_API(ReplyWithArray);
5323 REGISTER_API(ReplySetArrayLength);
5324 REGISTER_API(ReplyWithString);
5325 REGISTER_API(ReplyWithStringBuffer);
5326 REGISTER_API(ReplyWithNull);
5327 REGISTER_API(ReplyWithCallReply);
5328 REGISTER_API(ReplyWithDouble);
5329 REGISTER_API(GetSelectedDb);
5330 REGISTER_API(SelectDb);
5331 REGISTER_API(OpenKey);
5332 REGISTER_API(CloseKey);
5333 REGISTER_API(KeyType);
5334 REGISTER_API(ValueLength);
5335 REGISTER_API(ListPush);
5336 REGISTER_API(ListPop);
5337 REGISTER_API(StringToLongLong);
5338 REGISTER_API(StringToDouble);
5339 REGISTER_API(Call);
5340 REGISTER_API(CallReplyProto);
5341 REGISTER_API(FreeCallReply);
5342 REGISTER_API(CallReplyInteger);
5343 REGISTER_API(CallReplyType);
5344 REGISTER_API(CallReplyLength);
5345 REGISTER_API(CallReplyArrayElement);
5346 REGISTER_API(CallReplyStringPtr);
5347 REGISTER_API(CreateStringFromCallReply);
5348 REGISTER_API(CreateString);
5349 REGISTER_API(CreateStringFromLongLong);
5350 REGISTER_API(CreateStringFromString);
5351 REGISTER_API(CreateStringPrintf);
5352 REGISTER_API(FreeString);
5353 REGISTER_API(StringPtrLen);
5354 REGISTER_API(AutoMemory);
5355 REGISTER_API(Replicate);
5356 REGISTER_API(ReplicateVerbatim);
5357 REGISTER_API(DeleteKey);
5358 REGISTER_API(UnlinkKey);
5359 REGISTER_API(StringSet);
5360 REGISTER_API(StringDMA);
5361 REGISTER_API(StringTruncate);
5362 REGISTER_API(SetExpire);
5363 REGISTER_API(GetExpire);
5364 REGISTER_API(ZsetAdd);
5365 REGISTER_API(ZsetIncrby);
5366 REGISTER_API(ZsetScore);
5367 REGISTER_API(ZsetRem);
5368 REGISTER_API(ZsetRangeStop);
5369 REGISTER_API(ZsetFirstInScoreRange);
5370 REGISTER_API(ZsetLastInScoreRange);
5371 REGISTER_API(ZsetFirstInLexRange);
5372 REGISTER_API(ZsetLastInLexRange);
5373 REGISTER_API(ZsetRangeCurrentElement);
5374 REGISTER_API(ZsetRangeNext);
5375 REGISTER_API(ZsetRangePrev);
5376 REGISTER_API(ZsetRangeEndReached);
5377 REGISTER_API(HashSet);
5378 REGISTER_API(HashGet);
5379 REGISTER_API(IsKeysPositionRequest);
5380 REGISTER_API(KeyAtPos);
5381 REGISTER_API(GetClientId);
5382 REGISTER_API(GetContextFlags);
5383 REGISTER_API(PoolAlloc);
5384 REGISTER_API(CreateDataType);
5385 REGISTER_API(ModuleTypeSetValue);
5386 REGISTER_API(ModuleTypeGetType);
5387 REGISTER_API(ModuleTypeGetValue);
5388 REGISTER_API(SaveUnsigned);
5389 REGISTER_API(LoadUnsigned);
5390 REGISTER_API(SaveSigned);
5391 REGISTER_API(LoadSigned);
5392 REGISTER_API(SaveString);
5393 REGISTER_API(SaveStringBuffer);
5394 REGISTER_API(LoadString);
5395 REGISTER_API(LoadStringBuffer);
5396 REGISTER_API(SaveDouble);
5397 REGISTER_API(LoadDouble);
5398 REGISTER_API(SaveFloat);
5399 REGISTER_API(LoadFloat);
5400 REGISTER_API(EmitAOF);
5401 REGISTER_API(Log);
5402 REGISTER_API(LogIOError);
5403 REGISTER_API(StringAppendBuffer);
5404 REGISTER_API(RetainString);
5405 REGISTER_API(StringCompare);
5406 REGISTER_API(GetContextFromIO);
5407 REGISTER_API(GetKeyNameFromIO);
5408 REGISTER_API(BlockClient);
5409 REGISTER_API(UnblockClient);
5410 REGISTER_API(IsBlockedReplyRequest);
5411 REGISTER_API(IsBlockedTimeoutRequest);
5412 REGISTER_API(GetBlockedClientPrivateData);
5413 REGISTER_API(AbortBlock);
5414 REGISTER_API(Milliseconds);
5415 REGISTER_API(GetThreadSafeContext);
5416 REGISTER_API(FreeThreadSafeContext);
5417 REGISTER_API(ThreadSafeContextLock);
5418 REGISTER_API(ThreadSafeContextUnlock);
5419 REGISTER_API(DigestAddStringBuffer);
5420 REGISTER_API(DigestAddLongLong);
5421 REGISTER_API(DigestEndSequence);
5422 REGISTER_API(SubscribeToKeyspaceEvents);
5423 REGISTER_API(RegisterClusterMessageReceiver);
5424 REGISTER_API(SendClusterMessage);
5425 REGISTER_API(GetClusterNodeInfo);
5426 REGISTER_API(GetClusterNodesList);
5427 REGISTER_API(FreeClusterNodesList);
5428 REGISTER_API(CreateTimer);
5429 REGISTER_API(StopTimer);
5430 REGISTER_API(GetTimerInfo);
5431 REGISTER_API(GetMyClusterID);
5432 REGISTER_API(GetClusterSize);
5433 REGISTER_API(GetRandomBytes);
5434 REGISTER_API(GetRandomHexChars);
5435 REGISTER_API(BlockedClientDisconnected);
5436 REGISTER_API(SetDisconnectCallback);
5437 REGISTER_API(GetBlockedClientHandle);
5438 REGISTER_API(SetClusterFlags);
5439 REGISTER_API(CreateDict);
5440 REGISTER_API(FreeDict);
5441 REGISTER_API(DictSize);
5442 REGISTER_API(DictSetC);
5443 REGISTER_API(DictReplaceC);
5444 REGISTER_API(DictSet);
5445 REGISTER_API(DictReplace);
5446 REGISTER_API(DictGetC);
5447 REGISTER_API(DictGet);
5448 REGISTER_API(DictDelC);
5449 REGISTER_API(DictDel);
5450 REGISTER_API(DictIteratorStartC);
5451 REGISTER_API(DictIteratorStart);
5452 REGISTER_API(DictIteratorStop);
5453 REGISTER_API(DictIteratorReseekC);
5454 REGISTER_API(DictIteratorReseek);
5455 REGISTER_API(DictNextC);
5456 REGISTER_API(DictPrevC);
5457 REGISTER_API(DictNext);
5458 REGISTER_API(DictPrev);
5459 REGISTER_API(DictCompareC);
5460 REGISTER_API(DictCompare);
5461 REGISTER_API(ExportSharedAPI);
5462 REGISTER_API(GetSharedAPI);
5463 REGISTER_API(RegisterCommandFilter);
5464 REGISTER_API(UnregisterCommandFilter);
5465 REGISTER_API(CommandFilterArgsCount);
5466 REGISTER_API(CommandFilterArgGet);
5467 REGISTER_API(CommandFilterArgInsert);
5468 REGISTER_API(CommandFilterArgReplace);
5469 REGISTER_API(CommandFilterArgDelete);
5470 }
5471